9.2 Go实战Redis分布式锁

在一台主机的多线程场景里,为了保护某个对象在同一时刻只能被一个线程访问,可以用锁机制.即线程只有在获取该对象锁资源的前提下才能访问,在访问完以后需要立即释放锁,以便其他线程继续使用该对象.

再把问题扩展一下,如果访问同一对象的线程来自分布式系统里的多台主机,那么用来确保访问唯一性的锁就叫分布式锁.也就是说,如果多个线程要竞争同一个资源,就需要用到分布式锁,本节将讲述基于Redis分布式锁的相关实战技巧.

9.2.1 观察分布式锁的特性

在某支付系统里有这样一个需求:一个操作需要先读取存放在其他主机上某账户的余额,读到后在本机内存里进行加值操作,再把更新后的余额写回对方主机上.但是,在读数据到回写数据的这个时间段里,分布式系统里的其他主机也有可能会读写该余额数据,具体的效果如下图示:

我们假定这个分布式锁工作在一个分布式系统重的高并发场景下,因此除了应当具备"加锁"和"解锁"这2个功能外,还应具备如下两大特性:

  1. 需要有"限时等待"的特性.即使加锁的主机系统崩溃导致无法再发出"解锁"指令,加在这个余额上的分布式锁也应该在一定时间后自动解锁

  2. 需要确保解锁和加锁的主机必须唯一.例如:主机A发出"锁余额"的指令,同时发出"10s后解锁"指令,但是10s后主机A没有执行完操作余额的指令,此时锁应当自动释放,并且主机B获得锁

如上图示,这种"解开其他主机加的锁"的情况在分布式场景中需要避免,也就是说,需要确保解锁和加锁的主机是一致的,否则不予解锁

9.2.2 加锁与解锁的Redis命令分析

使用SETDEL命令来加锁和解锁.

  • SET key value [EX seconds|PX millseconds] [NX|XX] [KEEPTTL]

其中NX参数表示当key不存在时才进行设置值操作.若多个线程要用分布式锁竞争同一个资源,那么这些线程可以先通过SET flag 1 EX 60 NX命令向名为flag的key中设置值,由于加入了NX参数,因此只能有1个线程设置成功,相当于这个线程抢占到了分布式锁

此外,在该命令中,使用EX参数指定了flag键的生存时间,所以即使抢占到分布式锁的机器因为故障而无法发起DEL命令实现解锁时,该flag键能够在到达生存时间后自动被删除,这样该线程对资源的占有就会被自动释放,以供其他线程继续抢占

占有资源的线程在使用完毕后通过DEL flag命令来删除键,从而实现解锁的动作,但是在通过DEL命令解锁时需要确认加锁和解锁的是同一台机器或同一个线程,避免误解锁操作

9.2.3 基于Go的Redis分布式锁

注:此处实现的是一个最简版的分布式锁.以多个goroutine来模拟上文中的多个线程,打印了获取锁的情况.

工程结构如下:

(base) yanglei@yuanhong distributeLock % tree ./
./
├── go.mod
├── go.sum
├── lock
│   └── lock.go
└── main.go

1 directory, 4 files

lock/lock.go:

package lock

import (
	"github.com/gomodule/redigo/redis"
	"math/rand"
)

type DistributeLock struct {
	Key   string
	value int
	TTL   int
	Conn  redis.Conn
}

// Acquire 获取锁
func (l *DistributeLock) Acquire() bool {
	result, err := redis.String(l.Conn.Do("SET", l.Key, l.value, "NX", "EX", l.TTL))
	if err != nil {
		return false
	}

	return result == "OK"
}

// Release 释放锁
func (l *DistributeLock) Release() bool {
	// KEY存在 且 VALUE和设置时的值相同,才能删除
	luaScript := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
	return redis.call("DEL", KEYS[1])
else
	return 0
end
`
	script := redis.NewScript(1, luaScript)
	result, err := redis.Int(script.Do(l.Conn, l.Key, l.value))
	if err != nil {
		return false
	}

	return result == 1
}

// GenValue 生成随机值作为锁的value
func (l *DistributeLock) GenValue() {
	l.value = rand.Int()
}

main.go:

package main

import (
	"distributeLock/lock"
	"fmt"
	"github.com/gomodule/redigo/redis"
	"log"
	"sync"
	"time"
)

func main() {
	// 创建锁1
	conn1, err := redis.Dial("tcp", "127.0.0.1:6379")
	if err != nil {
		log.Fatalf("conn error: %v\n", err)
	}
	distributeLock1 := lock.DistributeLock{
		Key:  "flag",
		TTL:  60,
		Conn: conn1,
	}
	distributeLock1.GenValue()
	defer conn1.Close()

	// 创建锁2
	conn2, err := redis.Dial("tcp", "127.0.0.1:6379")
	if err != nil {
		log.Fatalf("conn error: %v\n", err)
	}
	distributeLock2 := lock.DistributeLock{
		Key:  "flag",
		TTL:  60,
		Conn: conn2,
	}
	distributeLock2.GenValue()
	defer conn2.Close()

	// 创建锁3
	conn3, err := redis.Dial("tcp", "127.0.0.1:6379")
	if err != nil {
		log.Fatalf("conn error: %v\n", err)
	}
	distributeLock3 := lock.DistributeLock{
		Key:  "flag",
		TTL:  60,
		Conn: conn3,
	}
	distributeLock3.GenValue()
	defer conn3.Close()

	// 启动3个goroutine
	wg := &sync.WaitGroup{}
	wg.Add(3)
	go retrieveLock(1, wg, distributeLock1)
	go retrieveLock(2, wg, distributeLock2)
	go retrieveLock(3, wg, distributeLock3)
	wg.Wait()
}

func retrieveLock(i int, wg *sync.WaitGroup, distributeLock lock.DistributeLock) {
	haveRetrieve := false

	for !haveRetrieve {
		// 获取锁
		if distributeLock.Acquire() {
			fmt.Printf("goroutine %d acquire lock success\n", i)
			haveRetrieve = true
		} else {
			fmt.Printf("goroutine %d acquire lock failed, retry soon\n", i)
			time.Sleep(2 * time.Second)
			continue
		}

		// 以等待5s作为假定的业务处理时间
		fmt.Printf("goroutine %d is processing\n", i)
		time.Sleep(5 * time.Second)

		// 释放锁
		if distributeLock.Release() {
			fmt.Printf("goroutine %d release lock success\n", i)
		} else {
			fmt.Printf("goroutine %d release lock failed\n", i)
		}
	}

	wg.Done()
}

运行结果:

(base) yanglei@yuanhong distributeLock % go run main.go
goroutine 1 acquire lock success
goroutine 1 is processing
goroutine 3 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 3 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 3 acquire lock failed, retry soon
goroutine 1 release lock success
goroutine 3 acquire lock success
goroutine 3 is processing
goroutine 2 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 2 acquire lock failed, retry soon
goroutine 3 release lock success
goroutine 2 acquire lock success
goroutine 2 is processing
goroutine 2 release lock success

注:

  1. 此处我尝试过用连接池管理distributeLock.Conn,但是在执行SET命令时报错,故使用给每个goroutine手动创建连接的方式执行

  2. 使用distributeLock.GenValue()方法设置value是为了避免3个goroutine手动设置相同的value

  3. 这个最简版的分布式锁,没有实现可重入的功能

Last updated