伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

肝一下ZooKeeper实现分布式锁的方案,附带实例!

来源:本站原创 浏览:142次 时间:2021-08-14

����,����zookeeper客户端选型
原生zookeeper客户端,有watcher一次性、无超时重连机制等一系列问题

ZkClient,解决了原生客户端一些问题,一些存量老系统中还在使用

curator,提供了各种应用场景(封装了分布式锁,计数器等),新项目首选

分布式锁使用场景
在单体项目中jvm中的锁即可完成需要,但是微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个jvm之间无法通过常用的jvm锁来完成同步操作,需要借用分布式锁来完成上锁、释放锁。例如在订单服务中,我们需要根据日期来生成订单号流水,就有可能产生相同的时间日期,从而出现重复订单号。

zookeeper分布式锁实现原理
zookeeper中规定,在同一时刻,不能有多个客户端创建同一个节点,我们可以利用这个特性实现分布式锁。zookeeper临时节点只在session生命周期存在,session一结束会自动销毁。

watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁,这也是zookeeper分布式锁较其他分布式锁方案的一大优势。

基于临时节点方案
第一种方案实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁。

临时节点
我们模仿原生jdk的lock接口设计,采用模板方法设计模式来编写分布式锁,这样的好处是扩展性强,我们可以快速切换到redis分布式锁、数据库分布式锁等实现方式。

创建Lock接口

public interface Lock {    /**     * 获取锁     */    void getLock() throws Exception;    /**     * 释放锁     */    void unlock() throws Exception;}

AbstractTemplateLock抽象类

public abstract class AbstractTemplateLock implements Lock {    @Override    public void getLock() {        if (tryLock()) {            System.out.println(Thread.currentThread().getName() + "获取锁成功");        } else {            //等待            waitLock();//事件监听 如果节点被删除则可以重新获取            //重新获取            getLock();        }    }    protected abstract void waitLock();    protected abstract boolean tryLock();    protected abstract void releaseLock();    @Override    public void unlock() {        releaseLock();    }}

zookeeper分布式锁逻辑

@Slf4jpublic class ZkTemplateLock extends AbstractTemplateLock {    private static final String zkServers = "127.0.0.1:2181";    private static final int sessionTimeout = 8000;    private static final int connectionTimeout = 5000;    private static final String lockPath = "/lockPath";    private ZkClient client;    public ZkTemplateLock() {        client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);        log.info("zk client 连接成功:{}",zkServers);    }    @Override    protected void waitLock() {        CountDownLatch latch = new CountDownLatch(1);        IZkDataListener listener = new IZkDataListener() {            @Override            public void handleDataDeleted(String dataPath) throws Exception {                System.out.println("监听到节点被删除");                latch.countDown();            }            @Override            public void handleDataChange(String dataPath, Object data) throws Exception {}        };        //完成 watcher 注册        client.subscribeDataChanges(lockPath, listener);        //阻塞自己        if (client.exists(lockPath)) {            try {                latch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        //取消watcher注册        client.unsubscribeDataChanges(lockPath, listener);    }    @Override    protected boolean tryLock() {        try {            client.createEphemeral(lockPath);            System.out.println(Thread.currentThread().getName()+"获取到锁");        } catch (Exception e) {            log.error("创建失败");            return false;        }        return true;    }    @Override    public void releaseLock() {       client.delete(this.lockPath);    }}

缺点:

每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“惊群”现象,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lockPath节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣。

基于临时顺序节点方案
临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点继续判断谁为最小序号的节点。

序列化节点

临时顺序节点
临时顺序节点操作源码

@Slf4jpublic class ZkSequenTemplateLock extends AbstractTemplateLock {    private static final String zkServers = "127.0.0.1:2181";    private static final int sessionTimeout = 8000;    private static final int connectionTimeout = 5000;    private static final String lockPath = "/lockPath";    private String beforePath;    private String currentPath;    private ZkClient client;    public ZkSequenTemplateLock() {        client = new ZkClient(zkServers);        if (!client.exists(lockPath)) {            client.createPersistent(lockPath);        }        log.info("zk client 连接成功:{}",zkServers);    }    @Override    protected void waitLock() {        CountDownLatch latch = new CountDownLatch(1);        IZkDataListener listener = new IZkDataListener() {            @Override            public void handleDataDeleted(String dataPath) throws Exception {                System.out.println("监听到节点被删除");                latch.countDown();            }            @Override            public void handleDataChange(String dataPath, Object data) throws Exception {}        };        //给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点        client.subscribeDataChanges(beforePath, listener);        //阻塞自己        if (client.exists(beforePath)) {            try {                System.out.println("阻塞"+currentPath);                latch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        //取消watcher注册        client.unsubscribeDataChanges(beforePath, listener);    }    @Override    protected boolean tryLock() {        if (currentPath == null) {            //创建一个临时顺序节点            currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");            System.out.println("current:" + currentPath);        }        //获得所有的子节点并排序。临时节点名称为自增长的字符串        List<String> childrens = client.getChildren(lockPath);        //排序list,按自然顺序排序        Collections.sort(childrens);        if (currentPath.equals(lockPath + "/" + childrens.get(0))) {            return true;        } else {            //如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath            int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));            beforePath = lockPath + "/" + childrens.get(curIndex - 1);        }        System.out.println("beforePath"+beforePath);        return false;    }    @Override    public void releaseLock() {        System.out.println("delete:" + currentPath);        client.delete(currentPath);    }}

Curator分布式锁工具
curator提供了以下种类的锁:

  • 共享可重入锁(Shared Reentrant Lock):全局同步锁,同一时间不会有两个客户端持有一个锁

  • 共享锁:与共享可重入锁类似,但是不可重入(有时候会因为这个原因造成死锁)

  • 共享可重入读写锁

  • 共享信号量

  • Multi Shared Lock:管理多种锁的容器实体

我们采用第一种Shared Reentrant Lock中的InterProcessMutex来完成上锁、释放锁的的操作

public class ZkLockWithCuratorTemplate implements Lock {    // zk host地址    private String host = "localhost";    // zk自增存储node    private String lockPath = "/curatorLock";    // 重试休眠时间    private static final int SLEEP_TIME_MS = 1000;    // 最大重试1000次    private static final int MAX_RETRIES = 1000;    //会话超时时间    private static final int SESSION_TIMEOUT = 30 * 1000;    //连接超时时间    private static final int CONNECTION_TIMEOUT = 3 * 1000;        //curator核心操作类    private CuratorFramework curatorFramework;    InterProcessMutex lock;   public ZkLockWithCuratorTemplate() {       curatorFramework = CuratorFrameworkFactory.builder()               .connectString(host)               .connectionTimeoutMs(CONNECTION_TIMEOUT)               .sessionTimeoutMs(SESSION_TIMEOUT)               .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))               .build();       curatorFramework.start();       lock = new InterProcessMutex (curatorFramework, lockPath);    }    @Override    public void getLock() throws Exception {        //5s后超时释放锁         lock.acquire(5, TimeUnit.SECONDS);    }    @Override    public void unlock() throws Exception {        lock.release();    }}

源码以及测试类地址

https://github.com/Motianshi/distribute-tool

  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net