ZooKeeper 实战(四) Curator Watch事件监听

这篇具有很好参考价值的文章主要介绍了ZooKeeper 实战(四) Curator Watch事件监听。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 */
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client,path);
        // 添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null){
                    String s = new String(currentData.getData(),StandardCharsets.UTF_8);
                    log.info("监听{}节点发生变化,数据内容:{}",path,s);
                }else {
                    log.info("监听{}节点被删除了",path);
                }
            }
        });
      	// 开启监听
        nodeCache.start();

        TimeUnit.SECONDS.sleep(2);
        // 创建节点
        client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 更新节点
        client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 删除节点
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }

2.3.日志输出

ZooKeeper 实战(四) Curator Watch事件监听,分布式实战,zookeeper,linux,分布式,java,java-zookeeper,spring boot

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 */
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{
  	// 子节点添加
    CHILD_ADDED,
  	// 子节点的数据变更
    CHILD_UPDATED,
		// 子节点被删除
    CHILD_REMOVED,
 
  	// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。
  	// 当连接状态处于ConnectionState.SUSPENDED。
    CONNECTION_SUSPENDED,
  	// 当连接状态处于ConnectionState.RECONNECTED
    CONNECTION_RECONNECTED,
  	// 当连接状态处于ConnectionState.LOST
    CONNECTION_LOST,
  	
  	// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成
  This event signals that the initial cache has been populated.
    INITIALIZED
}

3.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建PathChildrenCache对象
        // 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,
        // 那么后续pathChildrenCache.getCurrentData()得到的数据都为null
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);
        // 添加监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){
                    log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 创建子节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path+"/c1");
        client.create().creatingParentsIfNeeded().forPath(path+"/c2");
        client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");
    }

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

ZooKeeper 实战(四) Curator Watch事件监听,分布式实战,zookeeper,linux,分布式,java,java-zookeeper,spring boot

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 */
public TreeCache(CuratorFramework client, String path)
  
/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 * @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)
 * @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache
 */
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher/tree";
        TimeUnit.SECONDS.sleep(3);

        // 创建TreeCache对象,也可通过TreeCache.newBuilder()创建
        TreeCache treeCache = new TreeCache(client,path);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.INITIALIZED){
                    log.info("TreeCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        treeCache.start();

        // 创建节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path);
        client.create().creatingParentsIfNeeded().forPath(path +"/t1");
        client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");
    }

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

ZooKeeper 实战(四) Curator Watch事件监听,分布式实战,zookeeper,linux,分布式,java,java-zookeeper,spring boot文章来源地址https://www.toymoban.com/news/detail-786701.html

到了这里,关于ZooKeeper 实战(四) Curator Watch事件监听的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Zookeeper实战——分布式锁实现以及原理

    分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁的实现方式有很多种,比如 Redis 、数据库 、 zookeeper 等。这篇文章主要介绍用 Zookeeper 实现分布式锁。 先说结论: Zookeeper 是基于临时顺序节点以及 Watcher 监听器机制实现分布式锁的 。 (1)ZooKeeper 的每

    2023年04月08日
    浏览(34)
  • 分布式锁原理与实战三:ZooKeeper分布式锁的原理

             目录 ZooKeeper分布式锁的原理 ZooKeeper的每一个节点,都是一个天然的顺序发号器。 ZooKeeper节点的递增有序性,可以确保锁的公平 ZooKeeper的节点监听机制,可以保障占有锁的传递有序而且高效 ZooKeeper的节点监听机制,能避免羊群效应 分布式锁的抢占过程 客户端

    2024年02月08日
    浏览(45)
  • 【ZooKeeper高手实战】ZAB协议:ZooKeeper分布式一致性的基石

    🌈🌈🌈🌈🌈🌈🌈🌈 欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术 的推送 发送 资料 可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景 、 中间件系列笔记 和 编程高频电子书 ! 文章导读地址:点击查看文章导读!

    2024年02月03日
    浏览(46)
  • Zookeeper 实战 | Zookeeper 和Spring Cloud相结合解决分布式锁、服务注册与发现、配置管理

    专栏集锦,大佬们可以收藏以备不时之需: Spring Cloud 专栏: Python 专栏: Redis 专栏: TensorFlow 专栏: Logback 专栏: 量子计算: 量子计算 | 解密著名量子算法Shor算法和Grover算法 AI机器学习实战: AI机器学习实战 | 使用 Python 和 scikit-learn 库进行情感分析 AI机器学习 | 基于lib

    2024年02月05日
    浏览(90)
  • (五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题

    本节内容使用zookeeper实现分布式锁,完成并发访问“超卖”问题的解决。相对于redis分布式锁,zookeeper能够保证足够的安全性。关于zookeeper的安装内容这里不做介绍,开始本节内容之前先自行安装好zookeeper中间键服务。这里我们利用创建zookeeper路径节点的唯一性实现分布式锁

    2024年02月06日
    浏览(45)
  • uni-app:监听数据变化(watch监听、@input事件)

    方法一:文本框监听,使用 @input 事件 方法二:使用watch监听属性(很好解决了文本框中数据非手输时监听不到数据变化)

    2024年02月10日
    浏览(48)
  • 【Zookeeper】使用Curator操作Zookeeper

    Curator 是 Apache ZooKeeper 的Java客户端库。 Zookeeper现有常见的Java API如:原生JavaAPI、Curator、ZkClient等。 创建连接 创建节点 不指定数据 指定数据 设置节点类型 节点可以分为四大类: PERSISTENT 持久化节点 EPHEMERAL 临时节点,只在当前会话有效 PERSISTENT_SEQUENTIAL 持久化顺序节点 EPH

    2024年02月07日
    浏览(41)
  • 【分布式】Zookeeper

    可以参考:https://zhuanlan.zhihu.com/p/62526102 ZooKeeper 是一个分布式的,开放源码的分布式应用程序协同服务。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。 配置管理。 Java微服

    2024年02月11日
    浏览(46)
  • 分布式协调组件Zookeeper

    ZooKeeper 是⼀种 分布式协调组件 ,用于管理大型主机。 在分布式环境中协调和管理服务是一个复杂的过程 。ZooKeeper 通过其简单的架构和 API 解决了这个问题。ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。 分布式协调组件 在分布式系统

    2024年02月13日
    浏览(44)
  • Zookeeper 分布式锁案例

    Zookeeper 是一个开源的分布式协调服务,可以用于维护分布式系统中的一致性、顺序性和命名等。其中,Zookeeper 的分布式锁机制可以用于实现分布式系统中的互斥访问,确保在多个节点上对共享资源进行同步访问。 Zookeeper 分布式锁的实现原理是基于 Zookeeper 的临时有序节点和

    2024年02月16日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包