关于分布式锁.md 7.9 KB

分布式锁之Zookeeper

参考文档

https://mp.weixin.qq.com/s?__biz=MzAwNDA2OTM1Ng==&mid=2453141835&idx=1&sn=ff0867c9f5ecec9ea8187a21ef7edb2c&chksm=8cf2dbc8bb8552def9bb27fc6302e735eccdd68be5344d6e1d51d244b8e0753d56317cfe8bf2&token=1478279203&lang=zh_CN#rd

源代码

https://github1s.com/AobingJava/Thanos/blob/master/laogong/src/main/java/zookeeper/Zk.java

package zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Description: zkTest
 * @Author: 敖丙
 * @date: 2020-04-06
 **/
public class Zk implements Lock {
    private static CountDownLatch cdl = new CountDownLatch(1);

    private static final String IP_PORT = "127.0.0.1:2181";
    private static final String Z_NODE = "/LOCK";

    private volatile String beforePath;
    private volatile String path;

    private ZkClient zkClient = new ZkClient(IP_PORT);

    public Zk() {
        if (!zkClient.exists(Z_NODE)) {
            zkClient.createPersistent(Z_NODE);
        }
    }

    public void lock() {
        if (tryLock()) {
            System.out.println("获得锁");
        } else {
            // 尝试加锁
            // 进入等待 监听
            waitForLock();
            // 再次尝试
            lock();
        }

    }

    public synchronized boolean tryLock() {
        // 第一次就进来创建自己的临时节点
        if (StringUtils.isBlank(path)) {
            path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock");
        }

        // 对节点排序
        List<String> children = zkClient.getChildren(Z_NODE);
        Collections.sort(children);

        // 当前的是最小节点就返回加锁成功
        if (path.equals(Z_NODE + "/" + children.get(0))) {
            System.out.println(" i am true");
            return true;
        } else {
            // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
            int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1));
            beforePath = Z_NODE + "/" + children.get(i - 1);
        }
        return false;
    }

    public void unlock() {
        zkClient.delete(path);
    }

    public void waitForLock() {

        IZkDataListener listener = new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
            }

            public void handleDataDeleted(String s) throws Exception {
                System.out.println(Thread.currentThread().getName() + ":监听到节点删除事件!---------------------------");
                cdl.countDown();
            }
        };
        // 监听
        this.zkClient.subscribeDataChanges(beforePath, listener);
        if (zkClient.exists(beforePath)) {
            try {
                System.out.println("加锁失败 等待");
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 释放监听
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public Condition newCondition() {
        return null;
    }
}

优化后的代码

package zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Description: zkTest
 * @Author: 敖丙
 * @date: 2020-04-06
 **/
public class Zk implements Lock {
    private static CountDownLatch cdl = new CountDownLatch(1);

    private static final String IP_PORT = "127.0.0.1:2181";
    private static final String Z_NODE = "/LOCK";

    private volatile String beforePath;
    private volatile String path;

    private ZkClient zkClient = new ZkClient(IP_PORT);

    public Zk() {
        if (!zkClient.exists(Z_NODE)) {
            zkClient.createPersistent(Z_NODE);
        }
    }

    public void lock() {
        if (tryLock()) {
            System.out.println("获得锁");
            unlock();
        } else {
            // 尝试加锁
            // 进入等待 监听
            waitForLock();
            // 再次尝试
            lock();
        }

    }

    public synchronized boolean tryLock() {
        // 第一次就进来创建自己的临时节点
        if (StringUtils.isBlank(path)) {
            path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock");
        }

        // 对节点排序
        List<String> children = zkClient.getChildren(Z_NODE);
        Collections.sort(children);

        // 当前的是最小节点就返回加锁成功
        if (path.equals(Z_NODE + "/" + children.get(0))) {
            System.out.println(" i am true");
            return true;
        } else {
            // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
            int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1));
            beforePath = Z_NODE + "/" + children.get(i - 1);
        }
        return false;
    }

    public void unlock() {
        zkClient.delete(path);
    }

    public void waitForLock() {

        IZkDataListener listener = new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
            }

            public void handleDataDeleted(String s) throws Exception {
                System.out.println(Thread.currentThread().getName() + ":监听到节点删除事件!---------------------------");
                cdl.countDown();
            }
        };
        // 监听
        this.zkClient.subscribeDataChanges(beforePath, listener);
        if (zkClient.exists(beforePath)) {
            try {
                System.out.println("加锁失败 等待");
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 释放监听
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public Condition newCondition() {
        return null;
    }
}

分析

一些要注意的点

  1. 比如说10个进程,同时执行tryLock(),那么,第一个进来的进程,会创建第一个临时节点,后面9个进程会按次序创建节点
  2. 每个进程进来,都会将所有的节点sort一下,当且仅当这个临时节点,是最小的一个节点的时候,就加锁,返回true;其余节点不匹配,就返回false
  3. 返回true,执行锁的内容,然后unlock,删除path 这里应该有个节点的重定向,要重新选中最小的一个节点,进行加锁
  4. 返回false,进入waitForLock,先监听path是否删除,删除后,执行cdl 如果存在beforpath,就await,然后释放节点,即断开链接
  5. A进程正在操控锁,这个时候A进行挂起了,就会死锁 但由于是临时节点,A进程挂起后,A会自动删除,这个时候就不会死锁了
  6. 整个逻辑就是,不停地创建临时节点,然后消逝,总有一个最小的节点存在。 当最小节点存在时,就加锁 当最小节点存在时,其余节点自动创建——>删除 当最小节点挂起时,最后节点自动删除,会新创建节点