# 分布式锁之Zookeeper ## 参考文档 https://mp.weixin.qq.com/s?__biz=MzAwNDA2OTM1Ng==&mid=2453141835&idx=1&sn=ff0867c9f5ecec9ea8187a21ef7edb2c&chksm=8cf2dbc8bb8552def9bb27fc6302e735eccdd68be5344d6e1d51d244b8e0753d56317cfe8bf2&token=1478279203&lang=zh_CN#rd ## 源代码 https://github1s.com/AobingJava/Thanos/blob/master/laogong/src/main/java/zookeeper/Zk.java ```java package zookeeper; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.lang3.StringUtils; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Description: zkTest * @Author: 敖丙 * @date: 2020-04-06 **/ public class Zk implements Lock { private static CountDownLatch cdl = new CountDownLatch(1); private static final String IP_PORT = "127.0.0.1:2181"; private static final String Z_NODE = "/LOCK"; private volatile String beforePath; private volatile String path; private ZkClient zkClient = new ZkClient(IP_PORT); public Zk() { if (!zkClient.exists(Z_NODE)) { zkClient.createPersistent(Z_NODE); } } public void lock() { if (tryLock()) { System.out.println("获得锁"); } else { // 尝试加锁 // 进入等待 监听 waitForLock(); // 再次尝试 lock(); } } public synchronized boolean tryLock() { // 第一次就进来创建自己的临时节点 if (StringUtils.isBlank(path)) { path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock"); } // 对节点排序 List children = zkClient.getChildren(Z_NODE); Collections.sort(children); // 当前的是最小节点就返回加锁成功 if (path.equals(Z_NODE + "/" + children.get(0))) { System.out.println(" i am true"); return true; } else { // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样 int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1)); beforePath = Z_NODE + "/" + children.get(i - 1); } return false; } public void unlock() { zkClient.delete(path); } public void waitForLock() { IZkDataListener listener = new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { } public void handleDataDeleted(String s) throws Exception { System.out.println(Thread.currentThread().getName() + ":监听到节点删除事件!---------------------------"); cdl.countDown(); } }; // 监听 this.zkClient.subscribeDataChanges(beforePath, listener); if (zkClient.exists(beforePath)) { try { System.out.println("加锁失败 等待"); cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放监听 zkClient.unsubscribeDataChanges(beforePath, listener); } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public void lockInterruptibly() throws InterruptedException { } public Condition newCondition() { return null; } } ``` ## 优化后的代码 ```java package zookeeper; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.lang3.StringUtils; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Description: zkTest * @Author: 敖丙 * @date: 2020-04-06 **/ public class Zk implements Lock { private static CountDownLatch cdl = new CountDownLatch(1); private static final String IP_PORT = "127.0.0.1:2181"; private static final String Z_NODE = "/LOCK"; private volatile String beforePath; private volatile String path; private ZkClient zkClient = new ZkClient(IP_PORT); public Zk() { if (!zkClient.exists(Z_NODE)) { zkClient.createPersistent(Z_NODE); } } public void lock() { if (tryLock()) { System.out.println("获得锁"); unlock(); } else { // 尝试加锁 // 进入等待 监听 waitForLock(); // 再次尝试 lock(); } } public synchronized boolean tryLock() { // 第一次就进来创建自己的临时节点 if (StringUtils.isBlank(path)) { path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock"); } // 对节点排序 List children = zkClient.getChildren(Z_NODE); Collections.sort(children); // 当前的是最小节点就返回加锁成功 if (path.equals(Z_NODE + "/" + children.get(0))) { System.out.println(" i am true"); return true; } else { // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样 int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1)); beforePath = Z_NODE + "/" + children.get(i - 1); } return false; } public void unlock() { zkClient.delete(path); } public void waitForLock() { IZkDataListener listener = new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { } public void handleDataDeleted(String s) throws Exception { System.out.println(Thread.currentThread().getName() + ":监听到节点删除事件!---------------------------"); cdl.countDown(); } }; // 监听 this.zkClient.subscribeDataChanges(beforePath, listener); if (zkClient.exists(beforePath)) { try { System.out.println("加锁失败 等待"); cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放监听 zkClient.unsubscribeDataChanges(beforePath, listener); } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public void lockInterruptibly() throws InterruptedException { } public Condition newCondition() { return null; } } ``` ## 分析 一些要注意的点 1. 比如说10个进程,同时执行tryLock(),那么,第一个进来的进程,会创建第一个临时节点,后面9个进程会按次序创建节点 2. 每个进程进来,都会将所有的节点sort一下,当且仅当这个临时节点,是最小的一个节点的时候,就加锁,返回true;其余节点不匹配,就返回false 3. 返回true,执行锁的内容,然后unlock,删除path ~~这里应该有个节点的重定向,要重新选中最小的一个节点,进行加锁~~ 4. 返回false,进入waitForLock,~~先监听path是否删除,删除后,执行cdl~~ 如果存在beforpath,就await,然后释放节点,即断开链接 5. A进程正在操控锁,这个时候A进行挂起了,就会死锁 但由于是临时节点,A进程挂起后,A会自动删除,这个时候就不会死锁了 6. 整个逻辑就是,不停地创建临时节点,然后消逝,总有一个最小的节点存在。 当最小节点存在时,就加锁 当最小节点存在时,其余节点自动创建——>删除 当最小节点挂起时,最后节点自动删除,会新创建节点