0%

EnterpriseArchitecture-Distributed-Lock

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。是为了解决分布式系统中,不同的系统或是同一个系统的不同主机共享同一个资源的问题,它通常会采用互斥来保证程序的一致性。

锁是一种常用的并发控制机制,用于保证一项资源在任何时候只能被一个线程使用,如果其他线程也要使用同样的资源,必须排队等待上一个线程使用完。

分布式锁

上面说的锁指的是程序级别的锁,例如 Java 语言中的 synchronized 和 ReentrantLock 在单应用中使用不会有任何问题,但如果放到分布式环境下就要使用分布式锁。

实现分布式锁方案

  1. 基于 MySQL 的悲观锁来实现分布式锁,性能不太好。容易写bug造成Mysql死锁问题。
    • 基于数据库实现分布式锁比较简单,绝招在于创建一张锁表,为申请者在锁表里建立一条记录,记录建立成功则获得锁,消除记录则释放锁。
    • 单点故障问题。一旦数据库不可用,会导致整个系统崩溃
    • 死锁问题。数据库锁没有失效时间,未获得锁的进程只能一直等待已获得锁的进程主动释放锁。倘若已获得共享资源访问权限的进程突然挂掉、或者解锁操作失败,使得锁记录一直存在数据库中,无法被删除,而其他进程也无法获得锁,从而产生死锁现象。
  2. 基于 Redis 实现分布式锁,目前广泛使用的方案。
    • 当多个进程频繁去访问 Redis 时,Redis 可能成为瓶颈。关键Redis并不是和做分布式锁(比较极端的场景下)
    • 反复尝试会增加通信成本和性能开销,需要指定重试的次数。如果每次都是众多进程进行竞争的话,有可能会导致有些进程永远获取不到锁。
    • 可以集群部署,可以避免单点故障。
  3. 基于 ZooKeeper 实现分布式锁,利用 ZooKeeper 顺序临时节点来实现。
    • ZooKeeper 基于树形数据存储结构实现分布式锁,来解决多个进程同时访问同一临界资源时,数据的一致性问题。
    • 持久节点(PERSISTENT)。这是默认的节点类型,一直存在于 ZooKeeper 中。
    • 持久顺序节点(PERSISTENT_SEQUENTIAL)。在创建节点时,ZooKeeper 根据节点创建的时间顺序对节点进行编号命名。
    • 临时节点(EPHEMERAL)。当客户端与 Zookeeper 连接时临时创建的节点。与持久节点不同,当客户端与 ZooKeeper 断开连接后,该进程创建的临时节点就会被删除。
    • 临时顺序节点(EPHEMERAL_SEQUENTIAL)。就是按时间顺序编号的临时节点。
    • zookeeper在分布式环境下能保证互斥,具备锁失效机制。防止死锁即便出现持有锁崩溃或者锁失败的情况也能被动解锁。保证后续的线程可以获得锁。并且可以多次访问临界资源。有高可用获得锁和释放锁的功能,性能并不是很差。
    • 羊群效应,就是在整个 ZooKeeper 分布式锁的竞争过程中,大量的进程都想要获得锁去使用共享资源。每个进程都有自己的“Watcher”来通知节点消息,都会获取整个子节点列表,使得信息冗余,资源浪费。当共享资源被解锁后,Zookeeper 会通知所有监听的进程,这些进程都会尝试争取锁,但最终只有一个进程获得锁,使得其他进程产生了大量的不必要的请求,造成了巨大的通信开销,很有可能导致网络阻塞、系统性能下降。
      • 在与该方法对应的持久节点的目录下,为每个进程创建一个临时顺序节点。
      • 每个进程获取所有临时节点列表,对比自己的编号是否最小,若最小,则获得锁。
      • 若本进程对应的临时节点编号不是最小的,则注册 Watcher,监听自己的上一个临时顺序节点,当监听到该节点释放锁后,获取锁。
  4. 基于etcd分布式锁实现
  • 分别对这三种实现方式进行性能压测,可以发现在同样的服务器配置下,Redis 的性能是最好的,Zookeeper 次之,数据库最差。从实现方式和可靠性来说,Zookeeper 的实现方式简单,且基于分布式集群,可以避免单点问题,具有比较高的可靠性。因此,在对业务性能要求不是特别高的场景中,建议使用 Zookeeper 实现的分布式锁。

部署方案

  • 单机
    • 简单,但是不能保证高可用,一旦出现单点故障就gg了。
  • 集群
    • 在Redis场景下有大名鼎鼎的红锁(RedLock)
      • 红锁的核心逻辑是,部署集群的情况下,比如5个master。加锁的时候挨个加锁。当满足(5/2+1)=3 的时候就表示加锁成功,也就是半数成功则算成功。释放锁也是类似的操作。但是也带来了通信成本。仍需要二次检查锁的完整性。
      • 单点故障时,我们第一时间想到的就是搞几个 Slave 从节点做备份,Redis 里很好地支持了哨兵(Sentinel)模式,自动主从切换。锁写到Master后,还没同步到Slave呢,Master挂了Slave选举成了Master,但是Slave里没有锁,其他线程再次能上锁了。不安全。
      • 如果非要用Redis方案来做锁,在保证高可用的情况下可以通过二次检查的逻辑防止锁故障。
    • Redis集群只是做了 slot 分片,锁还是只写到一个 Master 上,所以它和哨兵(Sentinel)模式会面临同样的问题。
    • Zookeeper提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)
      • Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
      • 如果是钱的业务,建议使用zookeeper。
      • ZooKeeper实现分布式锁的核心原理是临时节点,更确切的说法是临时顺序节点。
      • ZooKeeper的节点是通过session心跳来续期的,比如客户端1创建了一个节点, 那么客户端1会和ZooKeeper服务器创建一个Session,通过这个Session的心跳来维持连接。如果ZooKeeper服务器长时间没收到这个Session的心跳,就认为这个Session过期了,也会把对应的节点删除。临时节点类型的最大特性是:当客户端宕机后,临时节点会随之消亡。

分布式锁的条件

  • 可以提供分布式部署应用集群中,同一个方法只能在某一个应用的线程执行。

  • 该锁需要包含一下特性:分布式互斥、重入锁、锁续期、阻塞锁、公平锁、良好的加锁释放锁性能。

分布式使用场景

  • 保证接口的幂等性,防止冲突提交数据。
  • 防止重复消费,比如推送消息或者发送邮件。保证合理的执行次数。
  • 防止分布式场景缓存击穿,比如秒杀活动的超卖情况。

分布式锁

Mysql实现分布式锁

单实例

演示代码

  • 表结构,简单演示效果,表字段可以根据实际情况设计。这里需要注意如果要使用可重入,需要增加版本字段。
1
2
3
4
5
6
7
8
9
10
-- 创建数据库
CREATE
DATABASE IF NOT EXISTS `distributed_lock_mysql` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
DROP TABLE IF EXISTS distributed_lock_mysql.`distributed_lock`;
CREATE TABLE distributed_lock_mysql.`distributed_lock`
(
`lock_id` varchar(50) NOT NULL COMMENT '锁唯标识',
`lock_value` varchar(50) NOT NULL COMMENT '锁内容,区分是谁上的锁',
PRIMARY KEY (`lock_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 通过唯一约束判断锁是否生成。
1
SELECT * FROM distributed_lock WHERE lock_id = ? LOCK IN SHARE MODE;
  • 加锁方式
1
INSERT INTO `distributed_lock`(`lock_id`, `lock_value`) VALUES (?, ?)
  • 加锁代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Autowired
private JdbcTemplate jdbcTemplate;

public Boolean lock(String key, String value) {
try {
// 简单的sql执行
String insertSql = "INSERT INTO `distributed_lock`(`lock_id`, `lock_value`)" +
" VALUES (?, ?)";
return jdbcTemplate.update(insertSql, key, value) > 0;
}catch (RuntimeException e){

}
return false;
}

public Boolean unLock(String key,String value) {
String sql = "SELECT * FROM distributed_lock WHERE lock_id = ? LOCK IN SHARE MODE";
Map<String, Object> lock = new HashMap<>();
try {
lock = jdbcTemplate.queryForMap(sql, key);
} catch (EmptyResultDataAccessException e) {
// 防止查询不到数据报错
return false;
}
if (CollectionUtils.isEmpty(lock)) {
return false;
}else{
if (Objects.equals(lock.get("lock_value"),value)) {
sql = "DELETE FROM `distributed_lock` WHERE lock_id = ?";
return jdbcTemplate.update(sql, key) > 0;
}
}
return false;
}

Redis实现分布式锁

单节点

在只存在master节点的情况下

  • 通过SET key value [EX seconds | PX milliseconds] [NX]来创建锁
1
2
127.0.0.1:6379> set lock true ex 30 nx
OK #创建锁成功
  • ex 是用来设置超时时间的,而 nx 是 not exists 的意思,用来判断键是否存在。如果返回的结果为“OK”则表示创建锁成功,否则表示此锁有人在使用。
  • 防止锁被删除
  • 防止锁过期,业务执行时间太长了。如果锁的时间很短。可能出现锁失效,建议将锁的过期时间设置长点,防止业务没执行完成锁消失后,被其它线程获取该锁。

通过给vaule设置一个唯一的值,在删除锁的时候判断是否当前业务程序操作。

  • KEYS[1] 加锁单key
  • ARGV[1] unique_value 防止被其它客户端非法删除
1
2
3
4
5
if redis.call('get', KEYS[1]) == ARGV[1] 
then return redis.call('del', KEYS[1])
else
return 0
end
  • 执行语法
    • 这种方式需要每次都传入 Lua 脚本字符串,不仅浪费网络开销,同时 Redis 需要每次重新编译 Lua 脚本,对于我们追求性能极限的系统来说,不是很完美。
1
EVAL script numkeys key [key ...] arg [arg ...]
  • 另一种方式
    • 其语法与 EVAL 类似,不同的是这里传入的不是脚本字符串,而是一个加密串 sha1。
1
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
  • SCRIPT LOAD
    • 通过预加载命令,将 Lua 脚本先存储在 Redis 中,并返回一个 sha1,下次要执行对应脚本时,只需要传入 sha1 即可执行对应的脚本。这完美地解决了 EVAL 命令存在的弊端,所以我们这里也是基于 EVALSHA 方式来实现的。
1
SCRIPT LOAD script

演示代码

  • 工具代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109


/**
* @author z201.coding@gmail.com
**/
@Service
@Slf4j
public class DistributedLockRedisTool {

private static String LOCK_PREFIX = "lock:";

@Autowired
private RedisTemplate redisTemplate;

/**
* 尝试获取分布式锁,设置重试次数
*
* @param key
* @param value
* @param retries
* @param timeout
* @return
*/
public boolean lock(String key, String value, int retries, long timeout) {
Boolean result = Boolean.FALSE;
// 最多重试三次
if (retries > 3) {
retries = 3;
}
if (retries > 0) {
for (int i = 0; i < retries; i++) {
result = lock(key, value, timeout);
if (!result) {
// 可以考虑线程睡眠
i++;
}
}
}
return result;
}

/**
* 设置锁,设置过期时间
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lock(String key, String value, long timeout) {
return tryLock(key, value, timeout, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
}

/**
* 尝试获取分布式锁,并设置获取锁的超时时间
*
* @param key 分布式锁 key
* @param value 分布式锁 value
* @param expireTime 锁的超时时间,防止死锁
* @param expireTimeUnit 锁的超时时间单位
* @param acquireTimeout 尝试获取锁的等待时间,如果在时间范围内获取锁失败,就结束获取锁
* @param acquireTimeoutUnit 尝试获取锁的等待时间单位
* @return 是否成功获取分布式锁
*/
public boolean tryLock(String key, String value, long expireTime, TimeUnit expireTimeUnit, int acquireTimeout, TimeUnit acquireTimeoutUnit) {
try {
// 尝试自旋获取锁,等待配置的一段时间,如果在时间范围内获取锁失败,就结束获取锁
long end = System.currentTimeMillis() + acquireTimeoutUnit.toMillis(acquireTimeout);
while (System.currentTimeMillis() < end) {
// 尝试获取锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, value, expireTime, expireTimeUnit);
// 验证是否成功获取锁
if (Objects.equals(Boolean.TRUE, result)) {
log.info("tryLock success {} {} ", key, value);
return true;
}
// 睡眠 50 毫秒
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("tryLock {} {} error {}", key, value, e.getMessage());
}
log.info("tryLock fail {} {} ", key, value);
return false;
}


public boolean unlock(String key, String value) {
String script = "if redis.call('get',KEYS[1]) == ARGV[1]"
+ "then"
+ " return redis.call('del',KEYS[1])"
+ "else "
+ " return 0 "
+ "end";
String[] args = new String[]{value};
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Object result = redisTemplate.execute(redisScript, Collections.singletonList(LOCK_PREFIX + key), args);
if (Objects.equals(result, 1L)) {
log.info("unlock ok");
return true;
}
return false;
}

}


  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Autowired
DistributedLockRedisTool distributedLockRedisTool;

@Autowired
RedisTemplate redisTemplate;

@Test
@Disabled
public void testLock() {
int count = 10;
CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(count);
try {
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
String key = UUID.randomUUID().toString();
log.info(" lock {} key {} ", distributedLockRedisTool.lock(key, key, 10L), key);
countDownLatch.countDown();
});
}
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Set<String> keys = redisTemplate.keys("*");
log.info("keys {}", keys.toString());
for (String key : keys) {
key = key.replace("lock:","");
log.info("unlock {} {}", distributedLockRedisTool.unlock(key, key),key);
}
executorService.shutdown();
}
  • Console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[pool-2-thread-7]  tryLock success   8e19a035-b610-4e68-a385-c133a1975fa8  8e19a035-b610-4e68-a385-c133a1975fa8  
[pool-2-thread-7] lock true key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-1] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-1] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-8] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-3] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-8] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-3] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-9] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-9] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-6] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-4] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-2] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-5] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-10] tryLock fail 8e19a035-b610-4e68-a385-c133a1975fa8 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-6] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-4] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-2] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-5] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[pool-2-thread-10] lock false key 8e19a035-b610-4e68-a385-c133a1975fa8
[main] keys [8e19a035-b610-4e68-a385-c133a1975fa8]
[main] unlock true 8e19a035-b610-4e68-a385-c133a1975fa8


Zookeeper实现分布式锁

注意zookeeper版本,版本不通命令可能出现差异。

  • 数据模型
    • zk 的数据模型和我们常见的目录树很像,从/开始,每一个层级就是一个节点每个节点,包含数据 + 子节点。
    • EPHEMERAL 节点,不能有子节点(可以理解为这个目录下不能再挂目录))
    • zk 中常说的监听器,就是基于节点的,一般来讲监听节点的创建、删除、数据变更
    • 节点,节点路径被指定就不能修改了,临时节点客户端会话结束会自动删除。
      • 持久节点 persistent node
      • 持久顺序节点 persistent sequental
      • 临时节点 ephemeral node
      • 临时顺序节点 ephemeral sequental
    • Watch 机制,Watcher(事件监听器)。ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知给用户。
  • zookeeper在实现分布锁的实现上采用删除节点或者临时节点的方式,但是需要频繁添加删除节点,所以性能不入缓存实现的分布式锁。
  • 演示使用docker环境快速安装
1
docker run -p 2181:2181 --name latest-zookeeper  -d zookeeper
  • docker ps 检查

  • 进入容器
1
docker exec -it 3f6a8c213504 /bin/bash # 3f6a8c213504 是容器id
  • 进入zookeeer根目录bin文件夹中执行zkCli.sh

  • 如果看到上图信息,则进入成功。
  • 使用zookeeper cli测试下基础命令
    • 创建znodes
    • 获取数据
    • 监视 znode 变化
    • 设置数据
    • 创建 znode 的子 znode
    • 列出一个 znode 的子 znode
    • 检查状态
    • 删除一个 znode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 创建
[zk: localhost:2181(CONNECTED) 0] create /path /00000001
Created /path
# 读取
[zk: localhost:2181(CONNECTED) 1] get /path
/00000001
# -s -w
[zk: localhost:2181(CONNECTED) 13] get /path -s -w
/00000001
cZxid = 0x2
ctime = Tue Jan 11 18:05:54 UTC 2022
mZxid = 0x2
mtime = Tue Jan 11 18:05:54 UTC 2022
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
# 写入数据
[zk: localhost:2181(CONNECTED) 15] set /path 00000002
[zk: localhost:2181(CONNECTED) 16] get /path -s -w
00000002
cZxid = 0x2
ctime = Tue Jan 11 18:05:54 UTC 2022
mZxid = 0x4
mtime = Tue Jan 11 18:08:23 UTC 2022
pZxid = 0x2
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] ls /path
[]
# 删除节点
[zk: localhost:2181(CONNECTED) 25] delete /path
# 删除后获取节点报错
[zk: localhost:2181(CONNECTED) 26] get /path
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /path
#创建子节点
[zk: localhost:2181(CONNECTED) 42] create /king 1
Created /king
# 增加子节点sub
[zk: localhost:2181(CONNECTED) 43] create /king/sub 2
Created /king/sub
# 增加子节点sub2
[zk: localhost:2181(CONNECTED) 46] create /king/sub1 3
Created /king/sub1
# 查看子节点
[zk: localhost:2181(CONNECTED) 47] ls /king
[sub, sub1]
[zk: localhost:2181(CONNECTED) 48]
#创建一个临时节点
[zk: localhost:2181(CONNECTED) 50] create -e /temp 3
Created /temp
# 当断开客户端连接的时候临时节点将被删除,可以通过退出 ZooKeeper CLI 尝试,然后重新打开命令行。
[zk: localhost:2181(CLOSED) 54] quit
# 从新进入
./zkCl
#此时发现节点已经被删除
[zk: localhost:2181(CONNECTED) 0] get /temp
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /temp

演示代码

  • 可以配置zookeeper集群,这里使用单节点测试。

  • 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

package cn.z201.zookeeper;

/**
* @author z201.coding@gmail.com
**/
@Configuration
@Slf4j
public class ZookeeperConfig {

private static final String LOCK_ROOT = "/LOCKS"; //锁的根路径
private static final String LOCK_NODE_NAME = "/L_"; //锁的名称,使用临时顺序节点

@Value("${zookeeper.address}")
private String connectString;

@Value("${zookeeper.timeout}")
private int timeout;

@Bean(name = "zkClient")
public ZooKeeper zkClient() {
ZooKeeper zooKeeper = null;
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
//连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
// 可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
init(zooKeeper);
log.info("init zookeeper success {}", zooKeeper.getState());
} catch (Exception e) {
throw new IllegalArgumentException("init zookeeper error " + e.getMessage());
}
return zooKeeper;
}

public void init(ZooKeeper zooKeeper) throws InterruptedException, KeeperException {
Stat existsNode = zooKeeper.exists(LOCK_ROOT, false);
if (existsNode == null) {
zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String key = LOCK_ROOT.concat(LOCK_NODE_NAME);
existsNode = zooKeeper.exists(key, false);
if (existsNode == null) {
zooKeeper.create(key, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}


}


  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package cn.z201.zookeeper;

/**
* @author z201.coding@gmail.com
**/
@Slf4j
public class DistributedLockZookeeperTool {

private final Thread currentThread = Thread.currentThread();
private static final String LOCK_ROOT = "/LOCKS"; //锁的根路径
private static final String LOCK_NODE_NAME = "/_"; //使用临时顺序节点
private ZooKeeper zkClient;
private String zNode;
private String watcherKey;

public DistributedLockZookeeperTool(ZooKeeper zooKeeper) {
this.zkClient = zooKeeper;
}

//判断某个元素是否被删除,如果删除了就将当前线程唤醒
Watcher watcher = event -> {
//如果获取到删除节点的事件,那么就唤醒当前线程
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
if (Objects.equals(watcherKey, event.getPath())) {
LockSupport.unpark(currentThread); //将当前线程唤醒
}
}
};

/**
* 创建持久化顺序节点
*/
public boolean tryLock() throws InterruptedException, KeeperException {
String key = LOCK_ROOT.concat(LOCK_NODE_NAME);
//创建临时有序节点,节点为/LOCKS/_xxxxxxxx
this.zNode = zkClient.create(key, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
lock();
log.info("lock {} ", zNode);
return true;
}

private void lock() throws InterruptedException, KeeperException {
//获取当前锁节点排序之后的下标,截取掉/LOCKS/之后的内容
List<String> childrenNodes = zkClient.getChildren(LOCK_ROOT, false);
Collections.sort(childrenNodes);
//获取当前锁节点排序之后的下标,截取掉/LOCKS/之后的内容
final int index = childrenNodes.indexOf(zNode.substring(LOCK_ROOT.length() + 1));
//如果获取到的index为0,也就是第一个元素
if (index == 0) {
return;
}
//如果不是第一个元素,获取上一个节点的路径
final String firstNode = childrenNodes.get(index - 1);
if (Objects.equals(LOCK_NODE_NAME, "/" + firstNode)) {
return;
}

Stat stat = zkClient.exists(LOCK_ROOT + "/" + firstNode, watcher);
if (null != stat) {
watcherKey = LOCK_ROOT + "/" + firstNode;
LockSupport.park();
}
Thread.sleep(100);
lock();
}

public List<String> list() throws InterruptedException, KeeperException {
List<String> childrenNodes = zkClient.getChildren(LOCK_ROOT, false);
return childrenNodes;
}

public void close() throws Exception {
log.info("unlock {}", zNode);
zkClient.delete(zNode, -1);
}

}



  • 测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

@Slf4j
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = AppApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@AutoConfigureMockMvc
// 指定单元测试方法顺序
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AppApplicationTest {

@Autowired
DistributedLockZookeeperTool distributedLockZookeeperTool;


@Test
@Disabled
public void testLock() {
int count = 10;
String key = UUID.randomUUID().toString();
CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(count);
try {
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
log.info(" lock {} key {} ", distributedLockZookeeperTool.createNode("/"+key, key),key);
countDownLatch.countDown();
});
}
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}

}
  • Console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[pool-2-thread-6]   lock true key  de181c1c-0963-4eec-a4eb-3ad19a7c1540 
[pool-2-thread-3] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-1] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-5] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-8] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-7] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-9] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-3] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-4] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-7] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-2] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-10] 创建持久化节点 异常 /de181c1c-0963-4eec-a4eb-3ad19a7c1540 de181c1c-0963-4eec-a4eb-3ad19a7c1540 KeeperErrorCode = NodeExists for /de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-2] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-1] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-5] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-8] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-9] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-4] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540
[pool-2-thread-10] lock false key de181c1c-0963-4eec-a4eb-3ad19a7c1540

Curator实现

在实际开发过程中建议使用Curator库。Apache Curator是Netflix公司开源的一个Zookeeper客户端,目前已经是Apache的顶级项目,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,通过封装的一套高级API,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂场景的zookeeper操作。

Curator提供了四种锁
  • 可重入互斥锁 InterProcessMutex
  • 不可重入互斥锁 InterProcessSemaphoreMutex
  • 读写锁 InterProcessReadWriteLock
  • 集合锁 InterProcessMultiLock

演示代码

  • 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author z201.coding@gmail.com
**/
@Configuration
public class CuratorConfiguration {

@Value("${zookeeper.curator.retryCount}")
private int retryCount;

@Value("${zookeeper.curator.elapsedTimeMs}")
private int elapsedTimeMs;

@Value("${zookeeper.curator.connectString}")
private String connectString;

@Value("${zookeeper.curator.sessionTimeoutMs}")
private int sessionTimeoutMs;

@Value("${zookeeper.curator.connectionTimeoutMs}")
private int connectionTimeoutMs;

@Bean(name = "curatorFramework", initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
connectString,
sessionTimeoutMs,
connectionTimeoutMs,
new RetryNTimes(retryCount, elapsedTimeMs)
);
}
}

  • 单元测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@Slf4j
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = AppApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@AutoConfigureMockMvc
// 指定单元测试方法顺序
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AppApplicationTest {

@Autowired
DistributedLockZookeeperCuratorTool distributedLockZookeeperCuratorTool;

@Resource(name = "curatorFramework")
private CuratorFramework curatorFramework;

@Test
@Disabled
public void testInterProcessMutexLock() {
int count = 10;
String key = UUID.randomUUID().toString();
CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(count);
try {
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
String path = distributedLockZookeeperCuratorTool.createPathKey(key);
InterProcessMutex lock = new InterProcessMutex(curatorFramework, path);
try {
lock.acquire();
log.info("{} 获取的到锁", Thread.currentThread().getName());
Thread.sleep(2000);
lock.release();
log.info("{} 释放锁", Thread.currentThread().getName());
countDownLatch.countDown();
} catch (Exception e) {

} finally {
try {
lock.release();
} catch (Exception e) {
// log.error("release error {}",e.getMessage());
}
}
});
}
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Test
@Disabled
public void testInterProcessSemaphoreMutexLock() {
int count = 10;
String key = UUID.randomUUID().toString();
CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(count);
try {
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
String path = distributedLockZookeeperCuratorTool.createPathKey(key);
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curatorFramework, path);
try {
lock.acquire();
log.info("{} 获取的到锁", Thread.currentThread().getName());
Thread.sleep(2000);
lock.release();
log.info("{} 释放锁", Thread.currentThread().getName());
countDownLatch.countDown();
} catch (Exception e) {

} finally {
try {
lock.release();
} catch (Exception e) {
// log.error("release error {}",e.getMessage());
}
}
});
}
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

InterProcessMutex

  • 可重入互斥锁,基础实现思路如下
    • 使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下
    • 创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点
    • 如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功
    • 如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听
      • 比如当前线程获取到的节点序号为/lock/001,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。
    • 如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小
      • 比如/lock/001释放了,/lock/002监听到事件,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。

  • Console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[pool-6-thread-6]  pool-6-thread-6 获取的到锁
[pool-6-thread-6] pool-6-thread-6 释放锁
[pool-6-thread-9] pool-6-thread-9 获取的到锁
[pool-6-thread-9] pool-6-thread-9 释放锁
[pool-6-thread-10] pool-6-thread-10 获取的到锁
[pool-6-thread-10] pool-6-thread-10 释放锁
[pool-6-thread-4] pool-6-thread-4 获取的到锁
[pool-6-thread-4] pool-6-thread-4 释放锁
[pool-6-thread-3] pool-6-thread-3 获取的到锁
[pool-6-thread-3] pool-6-thread-3 释放锁
[pool-6-thread-1] pool-6-thread-1 获取的到锁
[pool-6-thread-1] pool-6-thread-1 释放锁
[pool-6-thread-8] pool-6-thread-8 获取的到锁
[pool-6-thread-8] pool-6-thread-8 释放锁
[pool-6-thread-5] pool-6-thread-5 获取的到锁
[pool-6-thread-5] pool-6-thread-5 释放锁
[pool-6-thread-2] pool-6-thread-2 获取的到锁
[pool-6-thread-2] pool-6-thread-2 释放锁
[pool-6-thread-7] pool-6-thread-7 获取的到锁
[pool-6-thread-7] pool-6-thread-7 释放锁

InterProcessSemaphoreMutex

  • Console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[pool-6-thread-6]  pool-6-thread-6 获取的到锁
[pool-6-thread-6] pool-6-thread-6 释放锁
[pool-6-thread-10] pool-6-thread-10 获取的到锁
[pool-6-thread-10] pool-6-thread-10 释放锁
[pool-6-thread-5] pool-6-thread-5 获取的到锁
[pool-6-thread-5] pool-6-thread-5 释放锁
[pool-6-thread-7] pool-6-thread-7 获取的到锁
[pool-6-thread-7] pool-6-thread-7 释放锁
[pool-6-thread-3] pool-6-thread-3 获取的到锁
[pool-6-thread-3] pool-6-thread-3 释放锁
[pool-6-thread-8] pool-6-thread-8 获取的到锁
[pool-6-thread-8] pool-6-thread-8 释放锁
[pool-6-thread-9] pool-6-thread-9 获取的到锁
[pool-6-thread-9] pool-6-thread-9 释放锁
[pool-6-thread-2] pool-6-thread-2 获取的到锁
[pool-6-thread-2] pool-6-thread-2 释放锁
[pool-6-thread-4] pool-6-thread-4 获取的到锁
[pool-6-thread-4] pool-6-thread-4 释放锁
[pool-6-thread-1] pool-6-thread-1 获取的到锁
[pool-6-thread-1] pool-6-thread-1 释放锁

internalLockLoop方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private Boolean internalLockLoop(long startMillis, long millisToWait, String ourPath) throws Exception
{
Boolean haveTheLock = false;
Boolean doDelete = false;
try{
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){
// 获取当前所有节点排序后的集合
List<String> children = getSortedChildren();
// 获取当前节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
// 获取到锁
haveTheLock = true;
} else{
// 没获取到锁,对当前节点的上一个节点注册一个监听器
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true;
// timed out - delete our node
break;
}
wait(millisToWait);
} else {
wait();
}
}
catch ( KeeperException.NoNodeException e ){
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}