RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块

这篇具有很好参考价值的文章主要介绍了RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


一、使用Zookeeper的意义

分布式系统存在的问题:
为了支持高并发,每个客户端都保存了一份服务提供者的列表。但是如果列表有更新,想要得到最新的URL列表(rpc服务的ip和端口号),必须要手动更新配置文件,很不方便。
如图所示,实例3挂掉了,但是列表并没有得到更新。
RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块,分布式,rpc,zookeeper
故需要动态的更新URL列表,由此引入Zookeeper服务配置中心。

二、Zookeeper基础

zookeeper是为分布式应用提供一致性协调服务的中间件

本质:类似linux的文件系统
调用者真实运行的情况下其实并不知道自已想要调用的函数在哪台机器上(不知道ip和端口)
所以调用者需要在调用前先去服务配置中心去问一下想调用API所在机器上的ip和端口。
同时他还提供全局分布式锁,起到协调控制管理各个分布式节点的功能。

1 文件系统

Zookeeper提供一个多层级的节点命名空间(节点称为znode)。与文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据,目录节点不能存放数据。
Zookeeper为了保证高吞吐和低延迟,在内存中维护了这个树状的目录结构,这种特性使得Zookeeper不能用于存放大量的数据,每个节点的存放数据上限为1M。

一个节点可以存储1Mb的数据
RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块,分布式,rpc,zookeeper

znode节点操作指令:

  • ls 罗列节点
  • get 查看节点
  • create 创建节点
  • set 修改节点的值
  • delete 删除节点(要先删子节点)

2 通知机制

client端会对某个znode建立一个watcher事件,当该znode发生变化时,这些client会收到zk的通知,然后client可以根据znode变化来做出业务上的改变等。

zk的watcher机制
客户端通过watcher机制监听zk节点(变化)
客户端维护的map,键是节点的名字,值是节点的内容(ip+端口)
通过通知和回调机制,由zk主动向客户端汇报节点的变化(节点死掉、新节点加入)

3 原生zkclient API存在的问题

1、设置监听的watcher是一次性的
2、znode 节点只存储简单的byte字节数组(如果想存对象,需要转换对象生成字节数组)
注意:
原生zkclient会自动发送心跳消息(维护session),源码会在1/3的Timeout时间发送ping心跳。
抓包验证:
sudo tcpdump -I lo port 2181 // 抓2181这个端口的所有包

4 服务配置中心Zookeeper模块

所以,需要一个项目注册节点中心配置,维护session会话相当于检测tcp连接的心跳消息,以此来确定链接是否断开。
项目中的由每个服务创建的节点为临时性节点
总结:每个rpc服务器端都会向zookeeper服务注册配置中心 传入 ip + port + 服务名字,客户端进程查询获得ip+port
RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块,分布式,rpc,zookeeper

三、Zk类实现

封装的ZkClient客户端类头文件

#pragma once

class ZkClient
{
public:
    ZkClient();
    ~ZkClient();
    void Start();  // zkclient启动连接zkserver
    void Create(const char *path, const char *data, int datalen, int state=0); // 在zkserver上根据指定的path创建znode节点
    std::string GetData(const char *path);  // 根据参数指定的znode节点路径,或者znode节点的值
private:
   zhandle_t *m_zhandle;  // zk的客户端句柄
};

构造、析构实现:

ZkClient::ZkClient() : m_zhandle(nullptr)
{
}

ZkClient::~ZkClient()
{
    if (m_zhandle != nullptr)
    {
        zookeeper_close(m_zhandle); // 关闭句柄,释放资源  MySQL_Conn
    }
}

Start方法

首先从配置文件中读取zookeeper客户端ip和port。

std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
std::string connstr = host + ":" + port;

使用zookeeper_mt的多线程版本

zk的客户端提供了三个线程
1、API调用线程:zookeeper_init,直接导致下面两个线程的开辟
2、网络I/O收发线程:pthread_create(底层为poll,且会在1/3的Timeout时间发送ping心跳保持与zkserver的通信)
3、watcher回调线程:pthread_create当zkclient接收zkserver的响应后,zkserver给zkclient通知
zookeeper是异步连接过程,需要绑定一个全局回调函数global_watcher(新线程连接)

m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
之后检查创建的m_zhandle是否为空指针

输入:(127.0.0.1: 2181, 回调函数, session超时时间30s, null, null, 0)

注:

  • zk的端口号2181。
  • 上述代码只是成功创建句柄资源,并不代表zkserver的连接成功与否。
  • 全局回调函数global_watcher决定是否连接成功。

全局的watcher观察器:

// 全局的watcher观察器   zkserver给zkclient的通知
void global_watcher(zhandle_t *zh, int type,
                   int state, const char *path, void *watcherCtx)
{
    if (type == ZOO_SESSION_EVENT)  // 回调的消息类型是和会话相关的消息类型
	{
		if (state == ZOO_CONNECTED_STATE)  // zkclient和zkserver连接成功
		{
			sem_t *sem = (sem_t*)zoo_get_context(zh);
            sem_post(sem);
		}
	}
}

直到收到state == ZOO_CONNECTED_STATE消息才算连接成功。这时sem信号量置1。
也就说sem的值是由全局观察者在连接状态变为已连接时通过调用sem_post()或类似的函数来增加的。

Start方法中等待信号量sem为1,连接成功,打印信息。

sem_t sem;
sem_init(&sem, 0, 0);
zoo_set_context(m_zhandle, &sem);

sem_wait(&sem);  // 阻塞,直到sem为1
std::cout << "zookeeper_init success!" << std::endl;

创建节点、get节点值方法

void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{
    char path_buffer[128];
    int bufferlen = sizeof(path_buffer);
    int flag;
	// 先判断path表示的znode节点是否存在,如果存在,就不再重复创建了
	flag = zoo_exists(m_zhandle, path, 0, nullptr);
	if (ZNONODE == flag) // 表示path的znode节点不存在
	{
		// 创建指定path的znode节点了
		flag = zoo_create(m_zhandle, path, data, datalen,
			&ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
		if (flag == ZOK)
		{
			std::cout << "znode create success... path:" << path << std::endl;
		}
		else
		{
			std::cout << "flag:" << flag << std::endl;
			std::cout << "znode create error... path:" << path << std::endl;
			exit(EXIT_FAILURE);
		}
	}
}

// 根据指定的path,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
    char buffer[64];
	int bufferlen = sizeof(buffer);
	int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
	if (flag != ZOK)
	{
		std::cout << "get znode error... path:" << path << std::endl;
		return "";
	}
	else
	{
		return buffer;
	}
}

四、框架应用

rpc提供端框架

首先调用Start方法。
之后将当前rpc节点上要发布的服务全部注册到zk上,让rpc调用端可以从zk上发现服务。

  • 先创建永久父节点/UserServiceRpc
  • 再根据提供端维护的rpc方法map表,创建临时子节点/UserServiceRpc/Login(Login方法…),也就是将要发布的服务全部注册到zk上。
// session timeout   30s     zkclient 网络I/O线程  1/3 * timeout 时间发送ping消息
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点    method_name为临时性节点
for (auto &sp : m_serviceMap) 
{
    // /service_name   /UserServiceRpc
    std::string service_path = "/" + sp.first;
    zkCli.Create(service_path.c_str(), nullptr, 0);
    for (auto &mp : sp.second.m_methodMap)
    {
        // /service_name/method_name   /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
        std::string method_path = service_path + "/" + mp.first;
        char method_path_data[128] = {0};
        sprintf(method_path_data, "%s:%d", ip.c_str(), port);
        // ZOO_EPHEMERAL表示znode是一个临时性节点
        zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
    }
}

rpc调用端(客户端)框架

首先,同样是调用Start方法:

ZkClient zkCli;
zkCli.Start();

然后,CallMethod中通过zk获取ip:port。也就是说,通过要调用方法的名称(Login)在zk的节点中寻找对应的ip和port。

//  /UserServiceRpc/Login
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());

因为读取的地址是host_data = 127.0.0.1:8000,所以将其分离:

if (host_data == "")
{
    controller->SetFailed(method_path + " is not exist!");
    return;
}
int idx = host_data.find(":");
if (idx == -1)
{
    controller->SetFailed(method_path + " address is invalid!");
    return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx+1, host_data.size()-idx).c_str()); 

总结

Zookeeper功能如下:

  • master节点选举, 主节点down掉后, 从节点就会接手工作, 并且保证这个节点是唯一的,这也就是所谓首脑模式,从而保证我们集群是高可用的
  • 统一配置文件管理, 即只需要部署一台服务器, 则可以把相同的配置文件同步更新到其他所有服务器, 此操作在云计算中用的特别多(例如修改了redis统一配置)
  • 数据发布与订阅, 类似消息队列MQ
  • 分布式锁,分布式环境中不同进程之间争夺资源,类似于多进程中的锁
  • 集群管理, 保证集群中数据的强一致性

服务配置中心用法:文章来源地址https://www.toymoban.com/news/detail-554934.html

  • 每个rpc服务器端都会向zookeeper服务注册配置中心 传入 ip + port + 服务名字 客户端进程查询获得ip+port

到了这里,关于RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 RPC 框架HSF

    HSF (High-speed Service Framework),高速服务框架,是在阿里巴巴内部广泛使用的分布式 RPC 服务框架。 HSF 作为阿里巴巴的基础中间件,联通不同的业务系统,解耦系统间的实现依赖。HSF 从分布式应用的层面,统一了服务的发布/调用方式,从而帮助用户可以方便、快速的开发分布式

    2024年02月16日
    浏览(43)
  • 分布式RPC框架Dubbo详解

    目录   1.架构演进 1.1 单体架构 1.2  垂直架构 1.3 分布式架构 1.4 SOA架构 1.5 微服务架构 2.RPC框架 2.1 RPC基本概念介绍 2.1.1 RPC协议 2.1.2 RPC框架 2.1.3 RPC与HTTP、TCP/ UDP、Socket的区别 2.1.4 RPC的运行流程  2.1.5 为什么需要RPC 2.2 Dubbo  2.2.1 Dubbo 概述 2.2.2 Dubbo实战   架构演进如下图: 这

    2024年02月07日
    浏览(40)
  • 分布式系统消息通信技术:MOM与RPC

    中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这

    2024年02月11日
    浏览(41)
  • 【项目实战】分布式计算和通信框架(AKKA)入门介绍

    Akka是一个用于构建高并发、分布式、可容错、事件驱动的应用程序的工具包和运行时。它基于Actor模型,提供了一种高效的并发编程模型,可以轻松地编写出高并发、分布式、可容错的应用程序。Akka还提供了一些常用的组件,如路由、集群、持久化等,可以帮助开发人员更加

    2024年02月08日
    浏览(55)
  • 10 - 网络通信优化之通信协议:如何优化RPC网络通信?

    微服务框架中 SpringCloud 和 Dubbo 的使用最为广泛,行业内也一直存在着对两者的比较,很多技术人会为这两个框架哪个更好而争辩。 我记得我们部门在搭建微服务框架时,也在技术选型上纠结良久,还曾一度有过激烈的讨论。当前 SpringCloud 炙手可热,具备完整的微服务生态,

    2024年02月11日
    浏览(37)
  • 分布式【RPC 常见面试题】

    一、注册中心 策略:服务注册原理、注册中心结构、zookeeper的原理、几个注册中心的区别、分布式算法、分布式事务。 项目细节:服务注册、服务发现、服务注销、监听机制 介绍一下服务注册中心怎么做的? (1)服务发现: 服务注册/反注册:保存服务提供者和服务调用者

    2024年02月03日
    浏览(46)
  • 分布式理论CAP、BASE和RPC

    CAP原则是指当分布式系统遇到网络分区时,只能满足其中两个需求,一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)。在实际系统中,我们常常会选择在CA、CP或AP三者中做出取舍。 CA模型 CA模型要求分布式系统保持强一致性,即所有节点上的数据都

    2023年04月10日
    浏览(40)
  • 【DDD分布式系统学习笔记】RPC调用以及系统初步搭建

    modelVersion: 模型版本,指定POM模型的版本,目前使用的是Maven 4.0.0版本。 groupId: 项目的组织标识符,通常是组织的域名倒序。在这里是 cn.itedus.lottery。 artifactId: 项目的唯一标识符,通常是项目的名称。在这里是 Lottery。 packaging: 项目的打包方式,这里是 pom,表示这是一个聚合

    2024年01月18日
    浏览(51)
  • 分布式通信方式

      分布式通信是指在分布式系统中,不同节点之间进行消息传递和交互的方式。   以下是常见的分布式通信方式:   使用消息队列作为中间件,节点之间通过发送和接收消息来实现通信。消息队列提供了异步、解耦和可靠性的通信机制,常见的消息队列系统包括Rabb

    2024年02月15日
    浏览(38)
  • ROS:分布式通信

    ROS是一个分布式计算环境。一个运行中的ROS系统可以包含分布在多台计算机上多个节点。根据系统的配置方式,任何节点可能随时需要与任何其他节点进行通信。 因此,ROS对网络配置有某些要求: 所有端口上的所有机器之间必须有完整的双向连接。 每台计算机必须通过所有

    2024年02月12日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包