2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现

这篇具有很好参考价值的文章主要介绍了2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.Springboot项目中添加zookeeper 已经对应的客户端依赖 ,pom.xml文件如下

 <!-- Zookeeper组件 -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.9.1</version>
        </dependency>
        <!-- 包含Curator组件 -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-zookeeper</artifactId>
            <version>6.2.2</version>
        </dependency>

2.application.yml 文件中配置zookeeper连接的相关配置信息

zookeeper:
  #服务器地址
  connectString: 127.0.0.1:2181
  #会话超时时间
  sessionTimeoutMs: 3000
  #连接超时时间
  connectionTimeoutMs: 60000
  #最大重试次数
  maxRetries: 3
  #初始休眠时间
  baseSleepTimeMs: 1000

3.java配置的方式添加zookeeper相关的配置

package com.jinyi.up.zk.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @author huangchong
 * @date 2024/3/5 20:48
 * @desc
 */
@Slf4j
@Configuration
public class ZookeeperConfig {

    @Value("${zookeeper.connectString}")
    private String connectString;

    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;

    @Value("${zookeeper.maxRetries}")
    private int maxRetries ;

    @Value("${zookeeper.connectionTimeoutMs}")
    int connectionTimeoutMs ;

    @Value("${zookeeper.sessionTimeoutMs}")
    int sessionTimeoutMs ;

    private static CuratorFramework client = null ;
    /**
     * 初始化
     */
    @PostConstruct
    public void init (){
        // 重试策略
        RetryPolicy policy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
       //通过工厂创建Curator
        client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(policy).build();
        //开启连接
        client.start();
        log.info("zookeeper 初始化完成...");
    }

    @Bean
    public CuratorFramework getClient (){
        return client ;
    }

 /**
     * 分布式锁bean 注入spring管理中
     */
    @Bean
    public InterProcessMutex distributedLock() throws Exception {
        //使用了Curator提供的InterProcessMutex来创建一个分布式锁。我们使用ZooKeeper的路径/lock来表示锁的资源。
        InterProcessMutex distributedLock = new InterProcessMutex(client, "/lock");
        return distributedLock;
    }
}

4.Zookeeper基础操作服务和分布式锁服务编码

package com.jinyi.up.client.service;

import com.jinyi.up.zk.process.AbstractListenerProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * @author huangchong
 * @date 2024/3/5 21:39
 * @desc
 */
@Slf4j
@Service
public class ZookeeperService {

    @Resource
    private CuratorFramework client;

    /**
     * 查询节点数据
     *
     * @param nodePath 节点
     * @return {@link String}
     */
    public String queryData(String nodePath) {
        try {
            Stat stat = client.checkExists().forPath(nodePath);
            if (stat != null) {
                byte[] bytes = client.getData().forPath(nodePath);
                return new String(bytes, StandardCharsets.UTF_8);
            }
            return null;
        } catch (Exception e) {
            log.error("查询节点数据失败:", e);
            return null;
        }
    }

    /**
     * 创建节点
     *
     * @param mode     节点类型
     * @param nodePath 节点路径
     * @param nodeData 节点数据
     * @return {@link String}
     */
    public String create(CreateMode mode, String nodePath, String nodeData) {
        try {
            Stat stat = client.checkExists().forPath(nodePath);
            if (stat == null) {
                return client.create()
                        .withMode(mode)
                        .forPath(nodePath, nodeData.getBytes());
            } else {
                return null;
            }
        } catch (Exception e) {
            log.error("创建节点失败:", e);
            return null;
        }
    }


    /**
     * 更新节点数据
     *
     * @param nodePath 节点路径
     * @param nodeData 节点数据
     * @return {@link Stat}
     */
    public boolean update(String nodePath, String nodeData) {
        try {
            Stat stat = client.checkExists().forPath(nodePath);
            if (stat != null) {
                stat = client.setData().forPath(nodePath, nodeData.getBytes());
            }
            return stat != null;
        } catch (Exception e) {
            log.error("更新节点失败:", e);
            return false;
        }
    }

    /**
     * 删除节点
     *
     * @param nodePath v
     * @return {@link boolean}
     */
    public boolean delete(String nodePath) {
        try {
            Stat stat = client.checkExists().forPath(nodePath);
            if (stat != null) {
                client.delete().forPath(nodePath);
            }
            return true;
        } catch (Exception e) {
            log.error("删除节点失败:", e);
            return false;
        }

    }

    /**
     * 监听一个节点
     *
     * @param nodePath 被监听节点路径
     * @return {@link }
     */
    public boolean addWatchNodeListener(String nodePath) {
        CuratorCache curatorCache = CuratorCache.builder(client, nodePath).build();
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forNodeCache(new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        log.info("监听到节点变动");
                        //TODO
                    }
                }).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
        return true;
    }


    /**
     * 监听子孙节点 支持子节点的子节点监听
     * TreeCache监听节点自己和所有子节点们
     *
     * @param nodePath  被监听节点路径
     * @param processer 监听后处理
     * @return {@link }
     */
    public boolean addWatchTreeListener(String nodePath, AbstractListenerProcess processer) {
        CuratorCache curatorCache = CuratorCache.builder(client, nodePath).build();
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forTreeCache(client, new TreeCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) {
                        log.info("监听到子节点变动,变动类型:{}", treeCacheEvent.getType().toString());
                        processer.process(curatorFramework, treeCacheEvent);
                    }
                }).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
        return true;
    }
}
package com.jinyi.up.zk.service;

import com.jinyi.up.zk.process.AbstractListenerProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * @author huangchong
 * @date 2024/3/5 21:39
 * @desc
 */
@Slf4j
@Service
public class ZookeeperLockService {
    @Resource
    private InterProcessMutex distributedLock;

    public void doProtectedOperation() throws Exception {
        //acquire()方法获取锁
        distributedLock.acquire();
        try {
            // 执行需要保护的代码块
        } finally {
            distributedLock.release();
        }
    }
}

5.watcher机制事件处理抽象封装

package com.jinyi.up.zk.process;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;

/**
 * @author huangchong
 * @date 2024/3/5 21:58
 * @desc
 */
public abstract class AbstractListenerProcess {

    /**
     * 处理监听节点自己和所有子节点们变更事件
     *
     * @param client       zk客户端
     * @param event 子节点事件
     * @return {@link }
     */
    public abstract void process(CuratorFramework client, TreeCacheEvent event);
}
package com.jinyi.up.zk.process;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;

/**
 * @author huangchong
 * @date 2024/3/5 21:58
 * @desc
 */
@Slf4j
public  class WatcherTreeListenerProcess extends AbstractListenerProcess{
    /**
     * 实际处理监听节点自己和所有子节点们变更事件
     *
     * @param client zk客户端
     * @param event 子节点事件
     * @return {@link }
     */
    @Override
    public void process(CuratorFramework client, TreeCacheEvent event) {
        //事件path
        String path = event.getData().getPath();
        switch (event.getType()) {
            case NODE_ADDED:
                log.info("新增子节点:" + path);
                break;

            case NODE_UPDATED:
                log.info("更新子节点:" + path);
                break;

            case NODE_REMOVED:
                log.info("删除子节点:" + path);
                break;

            default:
                break;
        }
    }
}

6.基本操作的单元测试代码文章来源地址https://www.toymoban.com/news/detail-838216.html

package com.jinyi.zookeeper;

import com.jinyi.up.zk.ZookeeperApplication;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
///此处classes内的内容是@SpringBootApplication入口
@SpringBootTest(classes = {ZookeeperApplication.class})
public abstract class BaseZkBootTest {
}
package com.jinyi.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;


import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * @author huangchong
 * @date 2024/3/5 21:00
 * @desc
 */
@Slf4j
public class ZookeeperBaseTest extends BaseZkBootTest {
    @Resource
    private CuratorFramework client;

    @Test
    public void testAddPersistentNode() throws Exception {
        // 创建一个持久化节点/persistent_node,断开连接时不会自动删除
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/persistent_node");
    }

    @Test
    public void testZnodeExists() throws Exception {
        // 判断节点是否存在,persistent_node2不存在所以stat2是null
        Stat stat = client.checkExists().forPath("/persistent_node");
        log.info(String.valueOf(stat));
        Stat stat2 = client.checkExists().forPath("/persistent_node2");
        log.info(String.valueOf(stat2));
    }


    @Test
    public void testSetData() throws Exception {
        // 设置节点数据
        client.setData()
                .forPath("/persistent_node", "persistent_node_data".getBytes(StandardCharsets.UTF_8));
    }

    @Test
    public void testCreateAndSet() throws Exception {
        // 创建一个持久化节点并设置节点数据
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .forPath("//persistent_node1", "persistent_node_data1".getBytes(StandardCharsets.UTF_8));
    }

    @Test
    public void testGetData() throws Exception {
        // 查询节点数据
        byte[] data = client.getData().forPath("/persistent_node1");
        log.info(new String(data, StandardCharsets.UTF_8));
    }

    @Test
    public void testDelete() throws Exception {
        // 删除节点
        client.delete()
                .guaranteed()
                .deletingChildrenIfNeeded()
                .forPath("/persistent_node1");
    }

    @Test
    public void testReadLock() throws Exception {
        // 读写锁-读
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock-read");
        lock.readLock().acquire();
        log.info("获取-ReadLock");
        lock.readLock().release();
    }

    @Test
    public void testWriteLock() throws Exception {
        // 读写锁-写
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock-write");
        lock.writeLock().acquire();
        log.info("获取-WriteLock");
        lock.writeLock().release();
    }
}

到了这里,关于2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot事件监听

    Springboot事件监听中主要有以下对象: 1、事件(event)可以封装和传递监听器中要处理的参数,如对象或字符串,并作为监听器中监听的目标。 2、监听器(listener)具体根据事件发生的业务处理模块,这里可以接收处理事件中封装的对象或字符串。 3、事件发布者(publisher)

    2024年02月08日
    浏览(40)
  • 如何保证分布式系统中服务的高可用性:应对 ZooKeeper Leader 节点故障的注册处理策略

    作者:zhaokk 在现代分布式系统中,高可用性是一个至关重要的。分布式系统中的各个组件需要保证在各种异常情况下仍然能够正常工作,确保系统的稳定性和可靠性。ZooKeeper(以下简称为zk)作为一种常用的分布式协调服务,为分布式系统中的各种任务提供了基础支持

    2024年02月11日
    浏览(63)
  • springboot自定义事件发布及监听

    自定义线程池 实体类 事件发布 sevice层 serviceImpl层 事件监听 监听器的执行顺序 如果应用程序中有多个事件监听器,可以通过@Order 注解,指定它们的执行顺序。例如: 监听器的条件 只想在特定条件下才执行事件监听器,可以使用 @ConditionalOnProperty 注解: 参考博客:https://bl

    2024年02月10日
    浏览(40)
  • SpringBoot下使用自定义监听事件

    事件机制是Spring的一个功能,目前我们使用了SpringBoot框架,所以记录下事件机制在SpringBoot框架下的使用,同时实现异步处理。事件机制其实就是使用了观察者模式(发布-订阅模式)。 Spring的事件机制经过如下流程: 1、自定义事件,继承org.springframework.context.ApplicationEvent抽象类

    2024年02月14日
    浏览(77)
  • Java操作Zookeeper节点

    引入jar包: zookeeper的权限: ZooKeeper提供了如下几种验证模式(scheme): • digest:Client端由用户名和密码验证,譬如user:password,digest的密码生成方式是Sha1摘要的base64形式 • auth:不使用任何id,代表任何已确认用户。 • ip:Client端由IP地址验证,譬如172.2.0.0/24 • world:固定

    2024年02月10日
    浏览(40)
  • ZooKeeper中节点的操作命令(查看、创建、删除节点)

    天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。 ZooKeeper相关文章参考: ZooKeeper下载、安装、配置和使用 ZooKeeper配置文件zoo.cfg参数详解

    2024年02月06日
    浏览(39)
  • 【Java】SpringBoot实现事件监听(异步执行)

            在Spring Boot中,事件监听是一种机制,通过该机制,你可以定义和触发自定义的事件,以及在应用程序中注册监听器来响应这些事件,提供了一种解耦的方式来处理应用程序中的事件。 文末有源码gitee地址!拉取可进行测试。 事件监听的主要组件包括: 事件(E

    2024年04月17日
    浏览(43)
  • zookeeper之节点基本操作(头歌)

    开启ZooKeeper服务器。 使用客户端(zkCli.sh)连接客户端(IP:127.0.0.1,端口号:2181)。 创建/enode临时节点(节点数据为空)。 创建/spnode持久节点(节点数据为空)。 断开客户端(zkCli.sh)与客服端连接。 本关任务是使用命令行,进行以下操作: 开启ZooKeeper服务器。 使用客

    2024年02月03日
    浏览(52)
  • springboot监听Redis 缓存过期(Key 失效)事件

    事件通过 Redis 的订阅与发布功能(pub/sub)来进行分发, 故需要开启 redis 的事件监听与发布 修改 redis.conf 文件(Windows上是redis.windows.conf和redis.windows-service.conf) 通过开启key过期的事件通知,当key过期时,会发布过期事件;我们定义key过期事件的监听器,当key过期时,就能收到

    2024年02月12日
    浏览(36)
  • zookeeper之节点基本操作(二)(头歌)

    第一关: 开启ZooKeeper服务器。 使用客户端(zkCli.sh)连接客户端(IP:127.0.0.1,端口号:2181)。 创建/spnode持久节点(节点数据为空)。 使用get命令监听/spnode。 修改/spnode的节点值(当前客户端修改),观察watch事件。 再次使用get命令监听/spnode。 点击评测(评测中修改节点

    2024年04月11日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包