在一台主机的多线程场景里,为了保护某个对象在同一时刻只能被一个线程访问,可以用锁机制.即线程只有在获取该对象锁资源的前提下才能访问,在访问完以后需要立即释放锁,以便其他线程继续使用该对象.
再把问题扩展一下,如果访问同一对象的线程来自分布式系统里的多台主机,那么用来确保访问唯一性的锁就叫分布式锁.也就是说,如果多个线程要竞争同一个资源,就需要用到分布式锁,本节将讲述基于Redis分布式锁的相关实战技巧.
9.2.1 观察分布式锁的特性
在某支付系统里有这样一个需求:一个操作需要先读取存放在其他主机上某账户的余额,读到后在本机内存里进行加值操作,再把更新后的余额写回对方主机上.但是,在读数据到回写数据的这个时间段里,分布式系统里的其他主机也有可能会读写该余额数据,具体的效果如下图示:
我们假定这个分布式锁工作在一个分布式系统重的高并发场景下,因此除了应当具备"加锁"和"解锁"这2个功能外,还应具备如下两大特性:
需要有"限时等待"的特性.即使加锁的主机系统崩溃导致无法再发出"解锁"指令,加在这个余额上的分布式锁也应该在一定时间后自动解锁
需要确保解锁和加锁的主机必须唯一.例如:主机A发出"锁余额"的指令,同时发出"10s后解锁"指令,但是10s后主机A没有执行完操作余额的指令,此时锁应当自动释放,并且主机B获得锁
如上图示,这种"解开其他主机加的锁"的情况在分布式场景中需要避免,也就是说,需要确保解锁和加锁的主机是一致的,否则不予解锁
9.2.2 加锁与解锁的Redis命令分析
使用SET
和DEL
命令来加锁和解锁.
SET key value [EX seconds|PX millseconds] [NX|XX] [KEEPTTL]
其中NX
参数表示当key不存在时才进行设置值操作.若多个线程要用分布式锁竞争同一个资源,那么这些线程可以先通过SET flag 1 EX 60 NX
命令向名为flag的key中设置值,由于加入了NX
参数,因此只能有1个线程设置成功,相当于这个线程抢占到了分布式锁
此外,在该命令中,使用EX
参数指定了flag键的生存时间,所以即使抢占到分布式锁的机器因为故障而无法发起DEL
命令实现解锁时,该flag键能够在到达生存时间后自动被删除,这样该线程对资源的占有就会被自动释放,以供其他线程继续抢占
占有资源的线程在使用完毕后通过DEL flag
命令来删除键,从而实现解锁的动作,但是在通过DEL
命令解锁时需要确认加锁和解锁的是同一台机器或同一个线程,避免误解锁操作
9.2.3 基于Go的Redis分布式锁
注:此处实现的是一个最简版的分布式锁.以多个goroutine来模拟上文中的多个线程,打印了获取锁的情况.
工程结构如下:
(base) yanglei@yuanhong distributeLock % tree ./
./
├── go.mod
├── go.sum
├── lock
│ └── lock.go
└── main.go
1 directory, 4 files
lock/lock.go
:
package lock
import (
"github.com/gomodule/redigo/redis"
"math/rand"
)
type DistributeLock struct {
Key string
value int
TTL int
Conn redis.Conn
}
// Acquire 获取锁
func (l *DistributeLock) Acquire() bool {
result, err := redis.String(l.Conn.Do("SET", l.Key, l.value, "NX", "EX", l.TTL))
if err != nil {
return false
}
return result == "OK"
}
// Release 释放锁
func (l *DistributeLock) Release() bool {
// KEY存在 且 VALUE和设置时的值相同,才能删除
luaScript := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
script := redis.NewScript(1, luaScript)
result, err := redis.Int(script.Do(l.Conn, l.Key, l.value))
if err != nil {
return false
}
return result == 1
}
// GenValue 生成随机值作为锁的value
func (l *DistributeLock) GenValue() {
l.value = rand.Int()
}
main.go
:
package main
import (
"distributeLock/lock"
"fmt"
"github.com/gomodule/redigo/redis"
"log"
"sync"
"time"
)
func main() {
// 创建锁1
conn1, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatalf("conn error: %v\n", err)
}
distributeLock1 := lock.DistributeLock{
Key: "flag",
TTL: 60,
Conn: conn1,
}
distributeLock1.GenValue()
defer conn1.Close()
// 创建锁2
conn2, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatalf("conn error: %v\n", err)
}
distributeLock2 := lock.DistributeLock{
Key: "flag",
TTL: 60,
Conn: conn2,
}
distributeLock2.GenValue()
defer conn2.Close()
// 创建锁3
conn3, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatalf("conn error: %v\n", err)
}
distributeLock3 := lock.DistributeLock{
Key: "flag",
TTL: 60,
Conn: conn3,
}
distributeLock3.GenValue()
defer conn3.Close()
// 启动3个goroutine
wg := &sync.WaitGroup{}
wg.Add(3)
go retrieveLock(1, wg, distributeLock1)
go retrieveLock(2, wg, distributeLock2)
go retrieveLock(3, wg, distributeLock3)
wg.Wait()
}
func retrieveLock(i int, wg *sync.WaitGroup, distributeLock lock.DistributeLock) {
haveRetrieve := false
for !haveRetrieve {
// 获取锁
if distributeLock.Acquire() {
fmt.Printf("goroutine %d acquire lock success\n", i)
haveRetrieve = true
} else {
fmt.Printf("goroutine %d acquire lock failed, retry soon\n", i)
time.Sleep(2 * time.Second)
continue
}
// 以等待5s作为假定的业务处理时间
fmt.Printf("goroutine %d is processing\n", i)
time.Sleep(5 * time.Second)
// 释放锁
if distributeLock.Release() {
fmt.Printf("goroutine %d release lock success\n", i)
} else {
fmt.Printf("goroutine %d release lock failed\n", i)
}
}
wg.Done()
}
运行结果:
(base) yanglei@yuanhong distributeLock % go run main.go
goroutine 1 acquire lock success
goroutine 1 is processing
goroutine 3 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 3 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 3 acquire lock failed, retry soon
goroutine 1 release lock success
goroutine 3 acquire lock success
goroutine 3 is processing
goroutine 2 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 3 release lock success
goroutine 2 acquire lock success
goroutine 2 is processing
goroutine 2 release lock success
注:
此处我尝试过用连接池管理distributeLock.Conn
,但是在执行SET
命令时报错,故使用给每个goroutine手动创建连接的方式执行
使用distributeLock.GenValue()
方法设置value是为了避免3个goroutine手动设置相同的value