SpringBoot基于Zookeeper实现分布式锁

这篇具有很好参考价值的文章主要介绍了SpringBoot基于Zookeeper实现分布式锁。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot基于Zookeeper实现分布式锁,Zookeeper,分布式,java-zookeeper,spring boot

问题背景

研究分布式锁,基于ZK实现,需要整合到SpringBoot使用

前言

  1. 参考自SpringBoot集成Curator实现Zookeeper基本操作,Zookeeper入门
  2. 本篇的代码笔者有自己运行过,需要注意组件的版本号是否兼容,否则会有比较多的坑

实现

搭建Zookeeper容器

采用Docker compose快速搭建ZK容器,很快,几分钟就好了,而且是集群方式搭建。详情见笔者的Docker搭建zookeeper

引入依赖

需要注意的点:Curator 2.x.x-兼容两个zk 3.4.xzk 3.5.xCurator 3.x.x-兼容兼容zk 3.5,根据搭建的zk的版本使用对应的curator依赖。引入的zk依赖,如果项目中有使用logback日志 ,需要排除zk中的log4j12依赖,详情见下面笔者给出的依赖:

<dependencies>
  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>2.12.0</version>
  </dependency>

  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.12.0</version>
  </dependency>

  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.12.0</version>
  </dependency>

  <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.5.7</version>
      <exclusions>
          <exclusion>
              <artifactId>slf4j-log4j12</artifactId>
              <groupId>org.slf4j</groupId>
          </exclusion>
          <exclusion>
              <artifactId>slf4j-api</artifactId>
              <groupId>org.slf4j</groupId>
          </exclusion>
      </exclusions>
  </dependency>

ZK客户端的配置类

配置ZK的参数,使用@ConfigurationProperties可以令配置热更新,比如搭配Apollo、Nacos,如果使用@Valid则无法热更新,必须重启项目才能生效

@Component
@ConfigurationProperties(prefix = "curator")
@Data
public class ZKClientProps {

    private String connectString;
    private int retryCount;
    private int elapsedTimeMs;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
}

对应yml如下:

#curator配置
curator:
  connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
  retryCount: 1 # 重试次数
  elapsedTimeMs: 2000 # 重试间隔时间
  sessionTimeoutMs: 60000 # session超时时间
  connectionTimeoutMs: 10000 # 连接超时时间

ZK客户端的工厂类

定制ZK客户端:

@Component
public class ZKClientFactory {

    @Resource
    private ZKClientProps zkClientProps;
    public CuratorFramework createSimple() {
        //重试策略:第一次重试等待1S,第二次重试等待2S,第三次重试等待4s

        //第一个参数:等待时间的基础单位,单位为毫秒
        //第二个参数:最大重试次数
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(zkClientProps.getElapsedTimeMs(), zkClientProps.getRetryCount());

        //获取CuratorFramework示例的最简单方式
        //第一个参数:zk的连接地址
        //第二个参数:重试策略
        return CuratorFrameworkFactory.newClient(zkClientProps.getConnectString(), retry);
    }


    public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy,
                                                     int connectionTimeoutMs, int sessionTimeoutMs) {
        return CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).build();
    }
}

注入bean

创建ZK的客户端,详情如下:

@Component
@Slf4j
public class ZKClient {

    @Resource
    private ZKClientFactory zkClientFactory;
    public static final ZKClient INSTANCE = new ZKClient();

    private ZKClient() {
    }

    public CuratorFramework getClient() {
        return zkClientFactory.createSimple();
    }

    public boolean isNodeExist(String path) {
        CuratorFramework client = getClient();
        try {
            client.start();
            Stat stat = client.checkExists().forPath(path);
            return stat != null;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
        return false;
    }

    public void createNode(String path, byte[] bytes) {
        CuratorFramework client = getClient();
        try {
            // 必须start,否则报错
            client.start();
            client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, bytes);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

    public void deleteNode(String path) {
        CuratorFramework client = getClient();
        try {
            client.start();
            client.delete().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

    public List<String> getChildren(String path) {
        List<String> result = new LinkedList<>();
        CuratorFramework client = getClient();
        try {
            client.start();
            result = client.getChildren().forPath(path);
        } catch (Exception e) {
            log.error("ZKClient getChildren error.");
        }
        return result;
    }

}

构建测试类

测试基类,设置激活环境

@Slf4j
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = GmallZookeeperApplication.class)
@ContextConfiguration
public class BaseTest {

}

创建节点、删除节点、获取节点信息、分布式锁的方法如下:@ActiveProfiles("company")是激活笔者一个application-company.yml文件

application.yml如下:

server:
  port: 8022

spring:
  profiles:
    active: home

application-compay.yml如下:

#curator配置
curator:
  connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
  retryCount: 1 # 重试次数
  elapsedTimeMs: 2000 # 重试间隔时间
  sessionTimeoutMs: 60000 # session超时时间
  connectionTimeoutMs: 10000 # 连接超时时间

创建节点、删除节点、获取节点信息、分布式锁的方法如下:文章来源地址https://www.toymoban.com/news/detail-650480.html

@Slf4j
@ActiveProfiles("company")
public class ZKClientTest extends BaseTest{

    @Resource
    private ZKClient zkClient;
    public static final int THREAD_NUM = 10;

    @Test
    public void distributedLock() throws InterruptedException, BrokenBarrierException {
        String lockPath = "/test/distributed2/lock";
        CuratorFramework client = zkClient.getClient();
        client.start();
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);

        // 阻塞主线程,等待全部子线程执行完
        CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM);

        for (int i = 0; i < THREAD_NUM; i++) {
            new Thread(() -> {
                log.info("{}->尝试竞争锁", Thread.currentThread().getName());
                try {
                    lock.acquire(); // 阻塞竞争锁

                    log.info("{}->成功获得锁", Thread.currentThread().getName());
                    Thread.sleep(2000);

                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        lock.release(); //释放锁
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }, "Thread-" + i).start();
        }

        // 目的是为了等子线程抢完锁再结束子线程,否则无法看到日志效果
        cyclicBarrier.await();
        log.info("全部子线程已执行完毕");
    }

    @Test
    public void createNode() {
        // 创建一个ZNode节点
        String data = "hello";
        byte[] payload = data.getBytes(StandardCharsets.UTF_8);
        String zkPath = "/test/CRUD/node-1";

        zkClient.createNode(zkPath, payload);
        log.info("createNode succeeded!");

    }

    @Test
    public void getChildren() {
        String zkPath = "/test/CRUD";

        List<String> children = zkClient.getChildren(zkPath);
        printList(children);
    }

    @Test
    public void deleteNode() {
        String parentPath = "/test";

        log.info("======================Before delete===================");
        List<String> before = zkClient.getChildren(parentPath);
        printList(before);

        String zkPath = "/test/CRUD/node-1";
        zkClient.deleteNode(zkPath);
        log.info("delete node secceeded!");

        log.info("======================After delete===================");
        List<String> after = zkClient.getChildren(parentPath);
        printList(after);
    }

    private void printList(List<String> data) {
        if (!CollectionUtils.isEmpty(data)) {
            for (String datum : data) {
                log.info("datum:{}", data);
            }
        }
    }
}

到了这里,关于SpringBoot基于Zookeeper实现分布式锁的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)

    目录 一、基于zookeeper的分布式锁 1.1 基于Zookeeper实现分布式锁的原理 1.1.1 分布式锁特性说明 1.1.1.1 特点分析 1.1.1.2 本质 1.1.2 Zookeeper 分布式锁实现原理 1.1.2.1 Zookeeper临时顺序节点特性 1.1.2.2 Zookeeper满足分布式锁基本要求 1.1.2.3 Watcher机制 1.1.2.3 总结 1.2 分布式锁流程说明 1.2.1

    2024年04月24日
    浏览(41)
  • SpringBoot~ dubbo + zookeeper实现分布式开发的应用

    配置服务名字, 注册中心地址, 扫描被注册的包 server.port=8081 #当前应用名字 dubbo.application.name=provider-server #注册中心地址 dubbo.registry.address=zookeeper://127.0.0.1:2181 #扫描指定包下服务 dubbo.scan.base-packages=com.demo.service 实现一个接口,在接口中完成需求 public interface Translate { String tran

    2024年04月10日
    浏览(52)
  • 3、基于Zookeeper实现分布式锁

    3.1.1 安装启动 如下,说明启动成功: 3.1.2 相关概念 Zookeeper提供一个多层级的节点命名空间(节点称为znode),每个节点都用一个以斜杠(/)分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。并且每个节点都是唯一的。 1、znode节点有四种类型

    2024年02月15日
    浏览(40)
  • 2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现

    1.Springboot项目中添加zookeeper 已经对应的客户端依赖 ,pom.xml文件如下 2.application.yml 文件中配置zookeeper连接的相关配置信息 3.java配置的方式添加zookeeper相关的配置 4.Zookeeper基础操作服务和分布式锁服务编码 5.watcher机制事件处理抽象封装 6.基本操作的单元测试代码

    2024年03月10日
    浏览(49)
  • 第三章_基于zookeeper实现分布式锁

    实现分布式锁目前有三种流行方案,分别为基于数据库、Redis、Zookeeper的方案。这里主要介绍基于zk怎么实现分布式锁。在实现分布式锁之前,先回顾zookeeper的知识点。 知识点回顾 Zookeeper(业界简称zk)是一种提供配置管理、分布式协同以及命名的中心化服务,这些提供的 功

    2024年02月09日
    浏览(40)
  • 基于 SpringBoot + Redis 实现分布式锁

    大家好,我是余数,这两天温习了下分布式锁,然后就顺便整理了这篇文章出来。 文末附有源码链接,需要的朋友可以自取。 至于什么是分布式锁,这里不做赘述,不了解的可以自行去查阅资料。 1. 使用 Redis 的 Setnx(SET if Not Exists) 命令加锁。 即锁不存在的时候才能加锁成功

    2024年02月05日
    浏览(55)
  • 在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等

    在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等。下面是两种常见的实现方式: 使用Redis实现分布式锁: 使用自定义注解实现本地锁: 以上是两种常见的在Spring中实现分布式锁的方式。第一种方式使用Redis作为分布式锁的存储介质,通过

    2024年03月17日
    浏览(46)
  • Zookeeper 和 Redis 哪种更好? 为什么使用分布式锁? 1. 利用 Redis 提供的 第二种,基于 ZK 实现分布式锁的落地方案 对于 redis 的分布式锁而言,它有以下缺点:

    关于这个问题,我们 可以从 3 个方面来说: 为什么使用分布式锁? 使用分布式锁的目的,是为了保证同一时间只有一个 JVM 进程可以对共享资源进行操作。 根据锁的用途可以细分为以下两类: 允许多个客户端操作共享资源,我们称为共享锁 这种锁的一般是对共享资源具有

    2024年01月16日
    浏览(49)
  • 分布式锁解决方案_Zookeeper实现分布式锁

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 分布式锁解决方案_Zookeeper实现分布式锁 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: Zookeeper 是一个开源的分布式协调服务,它

    2024年02月03日
    浏览(42)
  • Zookeeper实现分布式锁

    ZooKeeper是一个分布式协调服务,其中提供的序列化、持久化、有层次的目录结构使得它非常适合用于实现分布式锁。在ZooKeeper中,分布式锁通常通过临时有序节点实现。以下是ZooKeeper分布式锁的详细介绍:  实现方式: 临时有序节点: 当一个客户端需要获取锁时,它在ZooK

    2024年02月02日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包