|
@@ -0,0 +1,260 @@
|
|
|
+# 分布式锁之Zookeeper
|
|
|
+
|
|
|
+## 参考文档
|
|
|
+
|
|
|
+https://mp.weixin.qq.com/s?__biz=MzAwNDA2OTM1Ng==&mid=2453141835&idx=1&sn=ff0867c9f5ecec9ea8187a21ef7edb2c&chksm=8cf2dbc8bb8552def9bb27fc6302e735eccdd68be5344d6e1d51d244b8e0753d56317cfe8bf2&token=1478279203&lang=zh_CN#rd
|
|
|
+
|
|
|
+## 源代码
|
|
|
+
|
|
|
+https://github1s.com/AobingJava/Thanos/blob/master/laogong/src/main/java/zookeeper/Zk.java
|
|
|
+
|
|
|
+```java
|
|
|
+package zookeeper;
|
|
|
+
|
|
|
+import org.I0Itec.zkclient.IZkDataListener;
|
|
|
+import org.I0Itec.zkclient.ZkClient;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.locks.Condition;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description: zkTest
|
|
|
+ * @Author: 敖丙
|
|
|
+ * @date: 2020-04-06
|
|
|
+ **/
|
|
|
+public class Zk implements Lock {
|
|
|
+ private static CountDownLatch cdl = new CountDownLatch(1);
|
|
|
+
|
|
|
+ private static final String IP_PORT = "127.0.0.1:2181";
|
|
|
+ private static final String Z_NODE = "/LOCK";
|
|
|
+
|
|
|
+ private volatile String beforePath;
|
|
|
+ private volatile String path;
|
|
|
+
|
|
|
+ private ZkClient zkClient = new ZkClient(IP_PORT);
|
|
|
+
|
|
|
+ public Zk() {
|
|
|
+ if (!zkClient.exists(Z_NODE)) {
|
|
|
+ zkClient.createPersistent(Z_NODE);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void lock() {
|
|
|
+ if (tryLock()) {
|
|
|
+ System.out.println("获得锁");
|
|
|
+ } else {
|
|
|
+ // 尝试加锁
|
|
|
+ // 进入等待 监听
|
|
|
+ waitForLock();
|
|
|
+ // 再次尝试
|
|
|
+ lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized boolean tryLock() {
|
|
|
+ // 第一次就进来创建自己的临时节点
|
|
|
+ if (StringUtils.isBlank(path)) {
|
|
|
+ path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 对节点排序
|
|
|
+ List<String> children = zkClient.getChildren(Z_NODE);
|
|
|
+ Collections.sort(children);
|
|
|
+
|
|
|
+ // 当前的是最小节点就返回加锁成功
|
|
|
+ if (path.equals(Z_NODE + "/" + children.get(0))) {
|
|
|
+ System.out.println(" i am true");
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
|
|
|
+ int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1));
|
|
|
+ beforePath = Z_NODE + "/" + children.get(i - 1);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void unlock() {
|
|
|
+ zkClient.delete(path);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void waitForLock() {
|
|
|
+
|
|
|
+ IZkDataListener listener = new IZkDataListener() {
|
|
|
+ public void handleDataChange(String s, Object o) throws Exception {
|
|
|
+ }
|
|
|
+
|
|
|
+ public void handleDataDeleted(String s) throws Exception {
|
|
|
+ System.out.println(Thread.currentThread().getName() + ":监听到节点删除事件!---------------------------");
|
|
|
+ cdl.countDown();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ // 监听
|
|
|
+ this.zkClient.subscribeDataChanges(beforePath, listener);
|
|
|
+ if (zkClient.exists(beforePath)) {
|
|
|
+ try {
|
|
|
+ System.out.println("加锁失败 等待");
|
|
|
+ cdl.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 释放监听
|
|
|
+ zkClient.unsubscribeDataChanges(beforePath, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void lockInterruptibly() throws InterruptedException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public Condition newCondition() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+```
|
|
|
+
|
|
|
+## 优化后的代码
|
|
|
+
|
|
|
+```java
|
|
|
+package zookeeper;
|
|
|
+
|
|
|
+import org.I0Itec.zkclient.IZkDataListener;
|
|
|
+import org.I0Itec.zkclient.ZkClient;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.locks.Condition;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description: zkTest
|
|
|
+ * @Author: 敖丙
|
|
|
+ * @date: 2020-04-06
|
|
|
+ **/
|
|
|
+public class Zk implements Lock {
|
|
|
+ private static CountDownLatch cdl = new CountDownLatch(1);
|
|
|
+
|
|
|
+ private static final String IP_PORT = "127.0.0.1:2181";
|
|
|
+ private static final String Z_NODE = "/LOCK";
|
|
|
+
|
|
|
+ private volatile String beforePath;
|
|
|
+ private volatile String path;
|
|
|
+
|
|
|
+ private ZkClient zkClient = new ZkClient(IP_PORT);
|
|
|
+
|
|
|
+ public Zk() {
|
|
|
+ if (!zkClient.exists(Z_NODE)) {
|
|
|
+ zkClient.createPersistent(Z_NODE);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void lock() {
|
|
|
+ if (tryLock()) {
|
|
|
+ System.out.println("获得锁");
|
|
|
+ unlock();
|
|
|
+ } else {
|
|
|
+ // 尝试加锁
|
|
|
+ // 进入等待 监听
|
|
|
+ waitForLock();
|
|
|
+ // 再次尝试
|
|
|
+ lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized boolean tryLock() {
|
|
|
+ // 第一次就进来创建自己的临时节点
|
|
|
+ if (StringUtils.isBlank(path)) {
|
|
|
+ path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 对节点排序
|
|
|
+ List<String> children = zkClient.getChildren(Z_NODE);
|
|
|
+ Collections.sort(children);
|
|
|
+
|
|
|
+ // 当前的是最小节点就返回加锁成功
|
|
|
+ if (path.equals(Z_NODE + "/" + children.get(0))) {
|
|
|
+ System.out.println(" i am true");
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
|
|
|
+ int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1));
|
|
|
+ beforePath = Z_NODE + "/" + children.get(i - 1);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void unlock() {
|
|
|
+ zkClient.delete(path);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void waitForLock() {
|
|
|
+
|
|
|
+ IZkDataListener listener = new IZkDataListener() {
|
|
|
+ public void handleDataChange(String s, Object o) throws Exception {
|
|
|
+ }
|
|
|
+
|
|
|
+ public void handleDataDeleted(String s) throws Exception {
|
|
|
+ System.out.println(Thread.currentThread().getName() + ":监听到节点删除事件!---------------------------");
|
|
|
+ cdl.countDown();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ // 监听
|
|
|
+ this.zkClient.subscribeDataChanges(beforePath, listener);
|
|
|
+ if (zkClient.exists(beforePath)) {
|
|
|
+ try {
|
|
|
+ System.out.println("加锁失败 等待");
|
|
|
+ cdl.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 释放监听
|
|
|
+ zkClient.unsubscribeDataChanges(beforePath, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void lockInterruptibly() throws InterruptedException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public Condition newCondition() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+```
|
|
|
+
|
|
|
+## 分析
|
|
|
+
|
|
|
+一些要注意的点
|
|
|
+
|
|
|
+1. 比如说10个进程,同时执行tryLock(),那么,第一个进来的进程,会创建第一个临时节点,后面9个进程会按次序创建节点
|
|
|
+2. 每个进程进来,都会将所有的节点sort一下,当且仅当这个临时节点,是最小的一个节点的时候,就加锁,返回true;其余节点不匹配,就返回false
|
|
|
+3. 返回true,执行锁的内容,然后unlock,删除path
|
|
|
+ ~~这里应该有个节点的重定向,要重新选中最小的一个节点,进行加锁~~
|
|
|
+4. 返回false,进入waitForLock,~~先监听path是否删除,删除后,执行cdl~~
|
|
|
+ 如果存在beforpath,就await,然后释放节点,即断开链接
|
|
|
+5. A进程正在操控锁,这个时候A进行挂起了,就会死锁
|
|
|
+ 但由于是临时节点,A进程挂起后,A会自动删除,这个时候就不会死锁了
|
|
|
+6. 整个逻辑就是,不停地创建临时节点,然后消逝,总有一个最小的节点存在。
|
|
|
+ 当最小节点存在时,就加锁
|
|
|
+ 当最小节点存在时,其余节点自动创建——>删除
|
|
|
+ 当最小节点挂起时,最后节点自动删除,会新创建节点
|
|
|
+
|