分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)

这篇具有很好参考价值的文章主要介绍了分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码),Java技术,分布式,zookeeper,分布式锁

目录

一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析

1.1.1.2 本质

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

1.1.2.2 Zookeeper满足分布式锁基本要求

1.1.2.3 Watcher机制

1.1.2.3 总结

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

1.2.2 流程说明

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock

1.3.1.2 模拟下单处理OrderServiceHandle

1.3.1.3 订单号生成类OrderCodeGenerator

1.3.1.4 分布式锁测试类TestZookeeperDistributedLock

1.3.1.5 测试效果

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖

1.3.2.2 代码实现

1.3.2.2.1 分布式锁类CuratorDistributeLock

1.3.2.2.2 测试类TestCuratorDistributedLock

1.3.2.3 执行结果


一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析
  • 每次只能一个占用锁;
  • 可以重复进入锁;
  • 只有占用者才可以解锁;
  • 获取锁和释放锁都需要原子
  • 不能产生死锁
  • 尽量满足性能
1.1.1.2 本质

同步互斥,使得处理任务能够一个一个逐步的过临界资源。

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

zookeeper中有一种临时顺序节点,它具有以下特征:

  • 时效性,当会话结束,节点将自动被删除
  • 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取
1.1.2.2 Zookeeper满足分布式锁基本要求
  1. 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的 每次只能一个占用锁,因为只有它一个获取到,所以可以实现 重复进入 ,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁
  2. zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的
  3. 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁
  4. zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能
1.1.2.3 Watcher机制

Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事
件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然
后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。

在实现分布式锁的时候,主要利用这个机制,实现释放锁的时候,通知等待锁的线程竞争锁。

1.1.2.3 总结

综上可知,Zookeeper其实是基于临时顺序节点特性实现的分布式锁。当然,还结合了他的Watcher机制,实现释放锁的时候,通知等待锁的线程去竞争锁。

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码),Java技术,分布式,zookeeper,分布式锁

1.2.2 流程说明

  1. client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
  2. client向/lock/目录下注册/lock/Node-前缀的临时顺序节点,并得到顺序号
  3. client获取/lock/目录下的所有临时顺序子节点
  4. client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
  5. 获得锁的线程,执行业务逻辑,执行完之后,删除临时节点,完成锁的释放。
  6. 等待锁的线程如果收到监控的临时节点被删除的通知,则再重复4、5、6步骤,进入下一个获得锁、释放锁的循环。

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;


/**
 * @author ningzhaosheng
 * @date 2024/4/17 18:13:38
 * @description 基于zookeeper实现的分布式锁
 */
public class ZookeeperDistributedLock implements Lock {

    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);

    // zookeeper 地址
    private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
    // zookeeper 锁目录
    private String LOCK_PATH = "/LOCK";

    // 创建 zookeeper客户端zkClient
    private ZkClient client = null;

    private CountDownLatch cdl;

    // 当前请求的节点前一个节点
    private String beforePath;
    // 当前请求的节点
    private String currentPath;


    /**
     * 初始化客户端和创建LOCK目录
     *
     * @param ZOOKEEPER_IP_PORT
     * @param LOCK_PATH
     */
    public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {
        this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;
        this.LOCK_PATH = LOCK_PATH;
        client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
        // 判断有没有LOCK目录,没有则创建
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            //对次小节点进行监听
            waitForLock();
            lock();
        } else {
            logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 创建一个临时顺序节点
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            System.out.println("---------------------------->" + currentPath);
        }

        // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
        List<String> childrens = this.client.getChildren(LOCK_PATH);
        //由小到大排序所有子节点
        Collections.sort(childrens);
        //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
        if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
            return true;
        }
        //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
        else {
            int wz = Collections.binarySearch(childrens, currentPath.substring(6));
            beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
        }

        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    //等待锁,对次小节点进行监听
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
                if (cdl != null) {
                    cdl.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
        this.client.subscribeDataChanges(beforePath, listener);
        if (this.client.exists(beforePath)) {
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    public void unlock() {
        // 删除当前临时节点
        client.delete(currentPath);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

}

1.3.1.2 模拟下单处理OrderServiceHandle
package com.ningzhaosheng.distributelock.zookeeper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:45:46
 * @description 模拟订单处理
 */
public class OrderServiceHandle implements Runnable {
    private static OrderCodeGenerator ong = new OrderCodeGenerator();

    private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);

    // 按照线程数初始化倒计数器,倒计数器
    private CountDownLatch cdl = null;

    private Lock lock = null;

    public OrderServiceHandle(CountDownLatch cdl, Lock lock) {
        this.cdl = cdl;
        this.lock = lock;
    }

    // 创建订单
    public void createOrder() {
        String orderCode = null;

        //准备获取锁
        lock.lock();
        try {
            // 获取订单编号
            orderCode = ong.getOrderCode();
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            //完成业务逻辑以后释放锁
            lock.unlock();
        }

        // ……业务代码

        logger.info("insert into DB使用id:=======================>" + orderCode);
    }


    @Override
    public void run() {
        try {
            // 等待其他线程初始化
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 创建订单
        createOrder();
    }
}

1.3.1.3 订单号生成类OrderCodeGenerator
package com.ningzhaosheng.distributelock.zookeeper;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:44:06
 * @description 生成订单号
 */
public class OrderCodeGenerator {
    // 自增长序列
    private static int i = 0;

    // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
    public String getOrderCode() {
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        return sdf.format(now) + ++i;
    }
}

1.3.1.4 分布式锁测试类TestZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:48:28
 * @description zookeeper分布式锁测试类
 */
public class TestZookeeperDistributedLock {
    public static void main(String[] args) {
        // zookeeper 地址
        String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";
        // zookeeper 锁目录
        String LOCK_PATH = "/LOCK";
        // 线程并发数
        int NUM = 10;

        CountDownLatch cdl = new CountDownLatch(NUM);
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);
            new Thread(new OrderServiceHandle(cdl, lock)).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}

1.3.1.5 测试效果

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码),Java技术,分布式,zookeeper,分布式锁

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码),Java技术,分布式,zookeeper,分布式锁

从上图执行结果中可以看出,在多线程情况下,分布式锁获取和释放正常。

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
1.3.2.2 代码实现

这里模拟业务使用分布式锁,还是使用的OrderServiceHandle类,这里只给出分布式锁实现类和测试类,不再给出OrderServiceHandle代码,可以参考上一小节的OrderServiceHandle类。

1.3.2.2.1 分布式锁类CuratorDistributeLock
package com.ningzhaosheng.distributelock.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 22:03:45
 * @description 实现Lock接口(其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装)
 */
public class CuratorDistributeLock implements Lock {

    private CuratorFramework client;
    private InterProcessMutex mutex;

    public CuratorDistributeLock(String connString, String lockPath) {
        this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
    }

    public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
        try {
            client = CuratorFrameworkFactory.builder()
                    .connectString(connString)
                    .retryPolicy(retryPolicy)
                    .build();
            client.start();

            mutex = new InterProcessMutex(client, lockPath);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lock() {
        try {
            // 获取锁
            mutex.acquire();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            // 释放锁
            mutex.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

1.3.2.2.2 测试类TestCuratorDistributedLock
package com.ningzhaosheng.distributelock.zookeeper.curator;

import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;

import java.util.concurrent.CountDownLatch;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:54:33
 * @description 基于 apache curator分布式锁测试类
 */
public class TestCuratorDistributedLock {
    private static final String ZK_ADDRESS = "192.168.31.9:2181";
    private static final String LOCK_PATH = "/distributed_lock";

    public static void main(String[] args) {

        int NUM = 10;
        CountDownLatch cdl = new CountDownLatch(NUM);
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            /** 创建CuratorDistributeLock
             * 其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装
             */
            CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);
            new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}

1.3.2.3 执行结果

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码),Java技术,分布式,zookeeper,分布式锁

从执行结果可以看出,基于apche curator框架实现zookeeper锁,它也是按照临时顺序节点的顺序获取锁的,每次获得锁的节点都是最小顺序节点,然后等待锁的线程,会基于watcher机制,每次给最小临时顺序节点加回调,监听节点的变更(即释放锁的线程会删除节点),然后再重新判断最小临时顺序节点,最小的获得锁执行,依次循环完成。

好了,本次内容就分享到这,欢迎关注本博主。如果有帮助到大家,欢迎大家点赞+关注+收藏,有疑问也欢迎大家评论留言!文章来源地址https://www.toymoban.com/news/detail-856880.html

到了这里,关于分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 3、基于Zookeeper实现分布式锁

    3.1.1 安装启动 如下,说明启动成功: 3.1.2 相关概念 Zookeeper提供一个多层级的节点命名空间(节点称为znode),每个节点都用一个以斜杠(/)分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。并且每个节点都是唯一的。 1、znode节点有四种类型

    2024年02月15日
    浏览(40)
  • SpringBoot基于Zookeeper实现分布式锁

    研究分布式锁,基于ZK实现,需要整合到SpringBoot使用 参考自SpringBoot集成Curator实现Zookeeper基本操作,Zookeeper入门 本篇的代码笔者有自己运行过,需要注意组件的版本号是否兼容,否则会有比较多的坑 采用Docker compose快速搭建ZK容器,很快,几分钟就好了,而且是集群方式搭建

    2024年02月12日
    浏览(47)
  • 第三章_基于zookeeper实现分布式锁

    实现分布式锁目前有三种流行方案,分别为基于数据库、Redis、Zookeeper的方案。这里主要介绍基于zk怎么实现分布式锁。在实现分布式锁之前,先回顾zookeeper的知识点。 知识点回顾 Zookeeper(业界简称zk)是一种提供配置管理、分布式协同以及命名的中心化服务,这些提供的 功

    2024年02月09日
    浏览(40)
  • 分布式锁原理与实战三:ZooKeeper分布式锁的原理

             目录 ZooKeeper分布式锁的原理 ZooKeeper的每一个节点,都是一个天然的顺序发号器。 ZooKeeper节点的递增有序性,可以确保锁的公平 ZooKeeper的节点监听机制,可以保障占有锁的传递有序而且高效 ZooKeeper的节点监听机制,能避免羊群效应 分布式锁的抢占过程 客户端

    2024年02月08日
    浏览(45)
  • 聊聊Seata分布式解决方案AT模式的实现原理

    Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案。 AT模式目前来看是Seata框架独有的一种模式,其它的分布式框架上并没有此种模式的实现。其是由二阶段提

    2024年02月05日
    浏览(42)
  • 从原理到实践,分析 Redisson 分布式锁的实现方案(二)

            上篇讲解了如何用 Redis 实现分布式锁的方案,它提供了简单的原语来实现基于Redis的分布式锁。然而,Redis作为分布式锁的实现方式也存在一些缺点。本文将引入Redisson来实现分布式锁。         Redisson是一个基于Redis的分布式Java框架。它提供了丰富的功能和工

    2024年02月15日
    浏览(47)
  • Zookeeper分布式锁的概念及原理

    分布式锁的概念图如下:一种演变过程。 在我们进行单机应用程序开发时,往往会涉及到并发同步的问题,一般都会采用synchronized或者Lock锁的方式来解决多线程间的代码同步问题,这些多线程都是运行在同一个JVM之下,是没有任何问题的。 场景:当有一个请求数据的线程进

    2024年02月12日
    浏览(41)
  • 在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等

    在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等。下面是两种常见的实现方式: 使用Redis实现分布式锁: 使用自定义注解实现本地锁: 以上是两种常见的在Spring中实现分布式锁的方式。第一种方式使用Redis作为分布式锁的存储介质,通过

    2024年03月17日
    浏览(46)
  • ZooKeeper 分布式协调服务: 概述及原理, 安装配置, 基本操作

    作者:禅与计算机程序设计艺术 Apache Zookeeper 是 Apache Hadoop 的子项目之一,是一个开源的分布式协调服务。它负责存储和维护关于网络中各个节点的数据。Zookeeper 提供了以下功能:配置维护、域名服务、同步和共享、软/硬件负载均衡、集群管理、Master 选举等。它的架构使得

    2024年02月08日
    浏览(41)
  • Zookeeper实现分布式锁

    ZooKeeper是一个分布式协调服务,其中提供的序列化、持久化、有层次的目录结构使得它非常适合用于实现分布式锁。在ZooKeeper中,分布式锁通常通过临时有序节点实现。以下是ZooKeeper分布式锁的详细介绍:  实现方式: 临时有序节点: 当一个客户端需要获取锁时,它在ZooK

    2024年02月02日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包