Zookeeper入门实战(5)-分布式锁

这篇具有很好参考价值的文章主要介绍了Zookeeper入门实战(5)-分布式锁。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。

1、引入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.4.0</version>
</dependency>

2、使用样例

2.1、可重入锁

@Test
public void interProcessMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一线程中可重复获取
                lock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //获取了几次就要释放几次
                release(lock);
                release(lock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.2、不可重入锁

@Test
public void interProcessSemaphoreMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一线程中不可重复获取
                //lock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(lock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.3、读写锁(可重入)

@Test
public void interProcessReadWriteLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
    InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
    InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                readLock.acquire();
                readLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了读锁");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //获取了几次就要释放几次
                release(readLock);
                release(readLock);
                logger.info(Thread.currentThread().getName() + "释放了读锁");
            }
            try {
                writeLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了写锁");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(writeLock);
                logger.info(Thread.currentThread().getName() + "释放了写锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.4、信号量

信号量用于控制对资源同时访问的进程或线程数。

@Test
public void interProcessSemaphoreV2() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            Lease lease = null;
            try {
                //获取一个许可
                lease = semaphore.acquire();
                logger.info(Thread.currentThread().getName() + "获得了许可");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放一个许可
                semaphore.returnLease(lease);
                logger.info(Thread.currentThread().getName() + "释放了许可");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.5、多个锁作为单个实体管理

InterProcessMultiLock 主要功能是将多个锁合并为一个对象来操作,简化了代码量。

@Test
public void InterProcessMultiLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                //相当于 lock.acquire() 和 lock2.acquire()
                multiLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(multiLock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

 

分布式锁使用样例的完整代码如下:

Zookeeper入门实战(5)-分布式锁Zookeeper入门实战(5)-分布式锁
package com.abc.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

public class CuratorLockCase {
    private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class);

    private static String connectString = "10.49.196.33:2181";
    private static int sessionTimeout = 40 * 1000;
    private static int connectionTimeout = 60 * 1000;

    /**
     * 可重入锁
     */
    @Test
    public void interProcessMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一线程中可重复获取
                    lock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //获取了几次就要释放几次
                    release(lock);
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 不可重入锁
     */
    @Test
    public void interProcessSemaphoreMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一线程中不可重复获取
                    //lock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 读写锁(可重入)
     */
    @Test
    public void interProcessReadWriteLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
        InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
        InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    readLock.acquire();
                    readLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了读锁");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //获取了几次就要释放几次
                    release(readLock);
                    release(readLock);
                    logger.info(Thread.currentThread().getName() + "释放了读锁");
                }
                try {
                    writeLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了写锁");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(writeLock);
                    logger.info(Thread.currentThread().getName() + "释放了写锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 信号量,用于控制对资源同时访问的进程或线程数
     */
    @Test
    public void interProcessSemaphoreV2() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                Lease lease = null;
                try {
                    //获取一个许可
                    lease = semaphore.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了许可");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //释放一个许可
                    semaphore.returnLease(lease);
                    logger.info(Thread.currentThread().getName() + "释放了许可");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }


    /**
     * 多个锁作为单个实体管理
     */
    @Test
    public void InterProcessMultiLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    //相当于 lock.acquire() 和 lock2.acquire()
                    multiLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(multiLock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    private CuratorFramework getCuratorFramework() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        return cf;
    }

    private void release(InterProcessLock lock) {
        if (lock != null) {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
CuratorLockCase

 文章来源地址https://www.toymoban.com/news/detail-478774.html

到了这里,关于Zookeeper入门实战(5)-分布式锁的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ZooKeeper+HBase分布式集群环境搭建

    安装版本:hadoop-2.10.1、zookeeper-3.4.12、hbase-2.3.1 一、zookeeper 集群搭建与配置 1. 下载zookeeper安装包 2. 解压移动zookeeper 3. 修改配置文件(创建文件夹) 4. 进入conf/ 5. 修改zoo.cfg文件 6. 进入/usr/local/zookeeper-3.4.12/zkdatas/这个路径下创建一个文件,文件名为myid ,文件内容为1 7. 拷贝到

    2024年02月08日
    浏览(44)
  • ZooKeeper 实战(五) Curator实现分布式锁

    1.1.分布式锁概念 分布式锁是一种用于实现分布式系统中的同步机制的技术。它允许在多个进程或线程之间实现互斥访问共享资源,以避免并发访问时的数据不一致问题。分布式锁的主要目的是在分布式系统中提供类似于全局锁的效果,以确保在任何时刻只有一个进程或线程

    2024年01月18日
    浏览(39)
  • 聊聊分布式架构10——Zookeeper入门详解

    目录 01ZooKeeper的ZAB协议 ZAB协议概念 ZAB协议基本模式 消息广播 崩溃恢复 选举出新的Leader服务器 数据同步 02Zookeeper的核心 ZooKeeper 的核心特点 ZooKeeper 的核心组件 选举算法概述 服务器启动时的Leader选举 服务器运行期间的Leader选举 03ZooKeeper的简单使用 04ZooKeeper的应用场景 01Zo

    2024年02月08日
    浏览(51)
  • 【ZooKeeper高手实战】ZAB协议:ZooKeeper分布式一致性的基石

    🌈🌈🌈🌈🌈🌈🌈🌈 欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术 的推送 发送 资料 可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景 、 中间件系列笔记 和 编程高频电子书 ! 文章导读地址:点击查看文章导读!

    2024年02月03日
    浏览(45)
  • 分布式集群——jdk配置与zookeeper环境搭建

    分布式集群——jdk配置与zookeeper环境搭建 分布式集群——搭建Hadoop环境以及相关的Hadoop介绍 文章目录 系列文章目录 前言 一 zookeeper介绍与环境配置 1.1 zookeeper的学习 1.2 Zookeeper的主要功能 1.2.1 znode的节点类型 1.2.2 zookeeper的实现 1.3 Zookeeper的特征 zookeeper的几种角色? 1.4 关于

    2024年02月10日
    浏览(51)
  • (五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题

    本节内容使用zookeeper实现分布式锁,完成并发访问“超卖”问题的解决。相对于redis分布式锁,zookeeper能够保证足够的安全性。关于zookeeper的安装内容这里不做介绍,开始本节内容之前先自行安装好zookeeper中间键服务。这里我们利用创建zookeeper路径节点的唯一性实现分布式锁

    2024年02月06日
    浏览(44)
  • Zookeeper 实战 | Zookeeper 和Spring Cloud相结合解决分布式锁、服务注册与发现、配置管理

    专栏集锦,大佬们可以收藏以备不时之需: Spring Cloud 专栏: Python 专栏: Redis 专栏: TensorFlow 专栏: Logback 专栏: 量子计算: 量子计算 | 解密著名量子算法Shor算法和Grover算法 AI机器学习实战: AI机器学习实战 | 使用 Python 和 scikit-learn 库进行情感分析 AI机器学习 | 基于lib

    2024年02月05日
    浏览(79)
  • Hadoop3.x完全分布式环境搭建Zookeeper和Hbase

    集群规划 IP地址 主机名 集群身份 192.168.138.100 hadoop00 主节点 192.168.138.101 hadoop01 从节点 192.168.138.102 hadoop02 从节点 Hadoop完全分布式环境搭建请移步传送门 先在主节点上进行安装和配置,随后分发到各个从节点上。 1.1 解压zookeeper并添加环境变量 1)解压zookeeper到/usr/local文件夹

    2024年02月04日
    浏览(43)
  • SpringBoot整合Dubbo和Zookeeper分布式服务框架使用的入门项目实例

    Dubbo是一个 分布式服务框架 ,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。简单的说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需要用的,只有在分布式的时候,才有dubbo这样的分布式服务框架的需求。其本质上是个远程服务调用

    2024年01月21日
    浏览(48)
  • 【项目实战】分布式计算和通信框架(AKKA)入门介绍

    Akka是一个用于构建高并发、分布式、可容错、事件驱动的应用程序的工具包和运行时。它基于Actor模型,提供了一种高效的并发编程模型,可以轻松地编写出高并发、分布式、可容错的应用程序。Akka还提供了一些常用的组件,如路由、集群、持久化等,可以帮助开发人员更加

    2024年02月08日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包