4Zookeeper方式 · SpringCloud微服务实战 · 看云
导航
本节代码地址
前面章节我们UI经讲过Zookeeper的安装方式,本节我们将直接介绍基于Zookeeper实现分布式锁,我们使用Curator框架来举例,该框架已经封装好了一个分布式互斥锁。Curator是Netflix公司开源的一个ZooKeeper客户端封装
一个分布式锁对应ZooKeeper的一个文件夹,每个需要获取这个分布式锁的客户端线程在这个文件夹下创建一个临时顺序节点,此时有两种情况:
- 创建的临时顺序节点是文件夹下的第一个节点,则认为是获取分布式锁成功。
- 创建的临时顺序节点不是文件夹下的第一个节点,则认为当前锁已经被另一个客户端线程获取,此时需要进入阻塞状态,等待节点顺序中的前一个节点释放锁的时候唤醒当前线程。
阻塞-唤醒逻辑:把文件夹下的节点顺序排列,找到当前节点的前一个节点,在前一个节点添加Watch,当前一个节点被删除时会触发Watch事件,进而唤醒当前阻塞线程。
如果前一个节点对应的客户端崩溃了,则节点对应的Watch事件也会触发,也会唤醒后一个节点对应的客户端线程,此时仍需要判断当前节点是第一个节点之后才能获取锁,否则继续进入阻塞并Watch前一个节点。
1. 新增依赖
我们将Zookeeper 分布式锁的代码也写在fw-springboot-lock中
在maven 中填加curator的客户端依赖包
<!-- zookeeper 分布式锁、注意zookeeper版本 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
2. 核心代码
这里面我们设置了Zookeeper锁的加锁、解锁以及Zookeeper 的连接配置,其中Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。
@NoArgsConstructor
public class ZkLockHelper {
private static String address = "localhost:2181";
public static CuratorFramework client;
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
client = CuratorFrameworkFactory.newClient(address, retryPolicy);
client.start();
}
private static class SingletonHolder {
private static InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
}
public static InterProcessMutex getMutex() {
return SingletonHolder.mutex;
}
public static boolean lock(long time, TimeUnit unit) {
try {
return getMutex().acquire(time, unit);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static void unlock() {
try {
getMutex().release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 编写单元测试
这里我们开启了5个线程,每个线程获取锁的最大等待时间1秒,不管有没有获取线程都会等待4秒
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class ZkLockHelperTest {
@Test
public void testDistributedLock() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
boolean flag = false;
try {
flag = ZkLockHelper.lock(1, TimeUnit.SECONDS);
if (flag) {
log.info("获取锁成功,{}" , Thread.currentThread().getName());
} else {
log.info("获取锁失败,{}" , Thread.currentThread().getName());
}
Thread.sleep(4000);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (flag) {
try {
ZkLockHelper.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
}
try {
fixedThreadPool.awaitTermination(10,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行单元测试可以看到控制台的日志如下
2020-03-21 12:58:32.487 INFO 17792 --- [pool-2-thread-5] c.yisu.lock.zookeeper.ZkLockHelperTest : 获取锁成功,pool-2-thread-5
2020-03-21 12:58:33.426 INFO 17792 --- [pool-2-thread-3] c.yisu.lock.zookeeper.ZkLockHelperTest : 获取锁失败,pool-2-thread-3
2020-03-21 12:58:33.430 INFO 17792 --- [pool-2-thread-1] c.yisu.lock.zookeeper.ZkLockHelperTest : 获取锁失败,pool-2-thread-1
2020-03-21 12:58:33.430 INFO 17792 --- [pool-2-thread-2] c.yisu.lock.zookeeper.ZkLockHelperTest : 获取锁失败,pool-2-thread-2
2020-03-21 12:58:33.430 INFO 17792 --- [pool-2-thread-4] c.yisu.lock.zookeeper.ZkLockHelperTest : 获取锁失败,pool-2-thread-4
