zookeeper应用场景(二)

这篇具有很好参考价值的文章主要介绍了zookeeper应用场景(二)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

单机环境下可以利用jvm级别的锁,比如synchronized、Lock等来实现锁,如果是多机部署就需要一个共享数据存储区域来实现分布式锁

一、分布式锁实现方式

1、基于数据库实现分布式锁

可以用数据库唯一索引来实现

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生

2、基于redis实现分布式锁

redis实现的分布式锁始终会有一些问题,即便使用多数写入,主节点挂了,数据丢失还是会存在加锁问题,就是主节点宕机,客户端无法感知

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生

3、基于zookeeper实现分布式锁

1)实现方式一

使用临时节点创建成功获取锁,否则监听临时节点,有个问题,比如1000个线程只有一个会加锁成功,当删除临时节点时999个线程都会去竞争

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生

2)实现方式二

公平锁的实现

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生

4、Curator可重入分布式锁工作流程

从InterProcessMutex类找到acquire()加锁方法

public void acquire() throws Exception {
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }
1)加锁 
private boolean internalLock(long time, TimeUnit unit) throws Exception {
        // 获取当前线程
        Thread currentThread = Thread.currentThread();
        // threadData类型是ConcurrentMap,从threadData中去拿LockData加锁对象
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        // 如果拿到了 证明之前加锁了,lockCount重入次数+1
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            // 没有拿到开始从zookeeper上创建lock节点
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            // 创建成功 加锁成功把对象放到threadData中
            if (lockPath != null) {
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                // 加锁失败
                return false;
            }
        }
    }

private static class LockData {
        // 持有锁的线程
        final Thread owningThread;
        // 在zookeeper的锁路径
        final String lockPath;
        // 线程加锁次数
        final AtomicInteger lockCount;

        private LockData(Thread owningThread, String lockPath) {
            this.lockCount = new AtomicInteger(1);
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
 2)创建节点返回路径
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        // 重试次数
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while(!isDone) {
            isDone = true;

            try {
                // 创建临时有序节点
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                // 创建的节点是否为最小节点
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                // 加锁失败 重试设置重试策略
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        // 是否要给节点设置属性 创建的都是临时有序节点
        if (lockNodeBytes != null) {
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
        } else {
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
        }

        return ourPath;
    }
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }

            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                // 将子节点进行排序
                List<String> children = this.getSortedChildren();
                // 截取创建的节点的序号
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                // 判断是否为最小序号
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                if (predicateResults.getsTheLock()) {
                    // 加锁成功
                    haveTheLock = true;
                } else {
                    // 拿到上一个节点的路径
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                            // 监听上一个节点
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
                                // 等待
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait > 0L) {
                                    // 超时等待
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }
 3)解锁
public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData == null) {
            // 分布式场景下什么情况都可能有 所以判断一下
            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
        } else {
            // 重入次数减1
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount <= 0) {
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
                } else {
                    try {
                        // 这里之前应该还有个大于0的判断 在curator5.1.0&zookeeper 3.9.0版本去掉了
                        this.internals.releaseLock(lockData.lockPath);
                    } finally {
                        this.threadData.remove(currentThread);
                    }

                }
            }
        }
    }

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生 

5、总结

优点:Zookeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题

缺点:因为需要频繁的创建和删除节点,性能上不如redis

在高性能、高并发场景下,不建议用Zookeeper的分布式锁。而由于Zookeeper的高可靠性,因此在并发量不是太高的应用场景下,还是推荐使用Zookeeper的分布式锁

二、服务注册与发现

1、设计思路

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生

2、实现注册中心的优缺点

优点:

  • 高可用性:ZooKeeper是一个高可用的分布式系统,可以通过配置多个服务器实例来提供容错能力。如果其中一个实例出现故障,其他实例仍然可以继续提供服务。
  • 强一致性:ZooKeeper保证了数据的强一致性。当一个更新操作完成时,所有的服务器都将具有相同的数据视图。这使得ZooKeeper非常适合作为服务注册中心,因为可以确保所有客户端看到的服务状态是一致的。
  • 实时性:ZooKeeper的监视器(Watcher)机制允许客户端监听节点的变化。当服务提供者的状态发生变化时(例如,上线或下线),客户端会实时收到通知。这使得服务消费者能够快速响应服务的变化,从而实现动态服务发现。

缺点:

  • 性能限制:ZooKeeper的性能可能不如一些专为服务注册中心设计的解决方案,如nacos或Consul。尤其是在大量的读写操作或大规模集群的情况下,ZooKeeper可能会遇到性能瓶颈。

3、整合Spring Cloud Zookeeper实现微服务注册中心 

Spring Cloud Zookeeper

第一步:在父pom文件中指定Spring Cloud版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

注意: springboot和springcloud的版本兼容问题

第二步:微服务pom文件中引入Spring Cloud Zookeeper注册中心依赖

<!-- zookeeper服务注册与发现 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题

Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration

第三步: 微服务配置文件application.yml中配置zookeeper注册中心地址

spring:
  cloud:
    zookeeper:    
      connect-string: localhost:2181
      discovery:
        instance-host: 127.0.0.1

注册到zookeeper的服务实例元数据信息如下:

zookeeper应用场景(二),zookeeper,zookeeper,分布式,云原生

注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定

第四步:整合feign进行服务调用文章来源地址https://www.toymoban.com/news/detail-725055.html

@RequestMapping(value = "/findOrderByUserId/{id}")
public R  findOrderByUserId(@PathVariable("id") Integer id) {
    log.info("根据userId:"+id+"查询订单信息");
    //feign调用   
    R result = orderFeignService.findOrderByUserId(id);
    return result;
}

到了这里,关于zookeeper应用场景(二)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ZooKeeper分布式锁的实现与应用

    ZooKeeper是一种分布式应用程序协调服务,它可以管理大规模的集群,并提供可靠的、有序的、高效的数据通信。其中,ZooKeeper提供的分布式锁是一种常见的分布式锁实现,本文将对其进行详细介绍。 在分布式系统中,多个进程或节点可能需要同时访问共享资源。为了确保数据

    2024年02月02日
    浏览(29)
  • 分布式应用程序协调服务 ZooKeeper 详解

    目录 1、ZooKeeper简介 2、ZooKeeper的使用场景 3、ZooKeeper设计目的 4、ZooKeeper数据模型

    2024年02月08日
    浏览(42)
  • 分布式应用:Zookeeper 集群与kafka 集群部署

    目录 一、理论 1.Zookeeper   2.部署 Zookeeper 集群 3.消息队列  4.Kafka 5.部署 kafka 集群 6.Filebeat+Kafka+ELK 二、实验 1.Zookeeper 集群部署 2.kafka集群部署 3.Filebeat+Kafka+ELK 三、问题          1.解压文件异常 2.kafka集群建立失败 3.启动 filebeat报错 4.VIM报错 5. kibana无法匹配 四、总结

    2024年02月14日
    浏览(40)
  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(46)
  • SpringBoot~ dubbo + zookeeper实现分布式开发的应用

    配置服务名字, 注册中心地址, 扫描被注册的包 server.port=8081 #当前应用名字 dubbo.application.name=provider-server #注册中心地址 dubbo.registry.address=zookeeper://127.0.0.1:2181 #扫描指定包下服务 dubbo.scan.base-packages=com.demo.service 实现一个接口,在接口中完成需求 public interface Translate { String tran

    2024年04月10日
    浏览(41)
  • websocket在分布式场景的应用方案

    websocket简介 WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它可以在客户端和服务器之间建立持久连接,使得服务器可以主动向客户端推送数据,而不需要客户端不断地向服务器发送请求。 WebSocket 协议的优点包括: 实时性:WebSocket 可以实现实时通信,数据传输的

    2024年02月13日
    浏览(25)
  • Rabbitmq----分布式场景下的应用

    如果单机模式忘记也可以看看这个快速回顾rabbitmq,在做学习 消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消

    2024年02月08日
    浏览(31)
  • Redis在分布式场景下的应用

    缓存的基本作用是在高并发场景下对应服务的保护缓冲 – 基于Redis集群解决单机Redis存在的问题 单机的Redis存在四大问题: redis由于高强度性能采用内存 但是意味着丢失的风险 单结点redis并发能力有限 分布式服务中数据过多 依赖内存的redis 明显单机不能满足 如果发生故障

    2024年02月08日
    浏览(34)
  • 除了上述提到的应用场景,分布式系统在云计算中还有如下一些应用场景

    除了上述提到的应用场景,分布式系统在云计算中还有如下一些应用场景: 大规模视频和图像存储:分布式云存储可以为企业提供大规模视频和图像存储的解决方案,帮助企业存储和管理海量的视频和图像数据,提高数据的可靠性和安全性。 机器学习:在机器学习中,需要

    2024年01月19日
    浏览(35)
  • Kafka 在分布式系统中的 7 大应用场景

    Kafka 是一个开源的分布式流式平台,它可以处理大量的实时数据,并提供高吞吐量,低延迟,高可靠性和高可扩展性。Kafka 的核心组件包括生产者(Producer),消费者(Consumer),主题(Topic),分区(Partition),副本(Replica),日志(Log),偏移量(Offset)和代理(Broker)。

    2024年02月08日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包