kafka生产端TCP连接管理

这篇具有很好参考价值的文章主要介绍了kafka生产端TCP连接管理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言:

Kafka生产者程序 

 Kafka生产者客户端如何创建TCP连接

 Kafka生产者客户端如何关闭TCP连接

总结: 

参考资料 


前言:

     在网络层协议中,TCP作用在第四层传输层、Http协议作用在第七层最上层应用层,一个完整的网络传输,信息会优先到达第四层,然后在往上传输到第七层,TCP协议相比于Http协议提供更好的连接稳定性及TCP提供的多路复用请求及可靠的消息交付语义保证,如自动重传丢失的报文等,kafka在设计上使用TCP协议作为所有请求通信的底层协议。

Kafka生产者程序 

kafka的Java生产者API主要的对象就是KafkaProducer。通常我们开发一个生产者的步骤有4步。

第1步:构造生产者对象所需的参数对象。

第2步:利用第1步的参数对象,创建KafkaProducer对象实例。

第3步:使用KafkaProducer的send方法发送消息。

第4步:调用KafkaProducer的close方法关闭生产者并释放各种系统资源。

上面这4步写成Java代码的话大概是这个样子:

Properties props = new Properties ();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);
……
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            producer.send(new ProducerRecord<String, String>(……), callback);
	……
}

   当我们开发一个Producer应用时,生产者会向Kafka集群中指定的主题(Topic)发送消息,这必然涉及与Kafka Broker创建TCP连接。那么,Kafka的Producer客户端是如何管理这些TCP连接的呢?

 Kafka生产者客户端如何创建TCP连接

 要回答上面这个问题,我们首先要弄明白生产者代码是什么时候创建TCP连接的。就上面的那段代码而言,可能创建TCP连接的地方有两处:Producer producer = new KafkaProducer(props)和producer.send(msg, callback)。你觉得连向Broker端的TCP连接会是哪里创建的呢?前者还是后者,抑或是两者都有?请先思考5秒钟,然后我给出我的答案。

 首先,生产者应用在创建KafkaProducer实例时是会建立与Broker的TCP连接的。其实这种表述也不是很准确,应该这样说:在创建KafkaProducer实例时,生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时首先会创建与Broker的连接。我截取了一段测试环境中的日志来说明这一点:

[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)

[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)

[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

 你也许会问:怎么可能是这样?如果不调用send方法,这个Producer都不知道给哪个主题发消息,它又怎么能知道连接哪个Broker呢?难不成它会连接bootstrap.servers参数指定的所有Broker吗?嗯,是的,Java Producer目前还真是这样设计的。

 我在这里稍微解释一下bootstrap.servers参数。它是Producer的核心参数之一,指定了这个Producer启动时要连接的Broker地址。请注意,这里的“启动时”,代表的是Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与这1000个Broker的TCP连接。

 在实际使用过程中,我并不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常你指定3~4台就足以了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。

 从上面这段日志中,我们可以发现,在KafkaProducer实例被创建后以及消息被发送前,Producer应用就开始创建与两台Broker的TCP连接了。当然了,在我的测试环境中,我为bootstrap.servers配置了localhost:9092、localhost:9093来模拟不同的Broker,但是这并不影响后面的讨论。另外,日志输出中的最后一行也很关键:它表明Producer向某一台Broker发送了METADATA请求,尝试获取集群的元数据信息——这就是前面提到的Producer能够获取集群所有信息的方法。

纵然KafkaProducer是线程安全的,我也不赞同创建KafkaProducer实例时启动Sender线程的做法。写了《Java并发编程实践》的那位布赖恩·格茨(Brian Goetz)大神,明确指出了这样做的风险:在对象构造器中启动线程会造成this指针的逃逸。理论上,Sender线程完全能够观测到一个尚未构造完成的KafkaProducer实例。当然,在构造对象时创建线程没有任何问题,但最好是不要同时启动它。 

 针对TCP连接何时创建的问题,目前我们的结论是这样的:TCP连接是在创建KafkaProducer实例时建立的那么,我们想问的是,它只会在这个时候被创建吗?

当然不是!TCP连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。为什么说是可能?因为这两个地方并非总是创建TCP连接。当Producer更新了集群的元数据信息之后,如果发现与某些Broker当前没有连接,那么它就会创建一个TCP连接。同样地,当要发送消息时,Producer发现尚不存在与目标Broker的连接,也会创建一个。 

 接下来,我们来看看Producer更新集群元数据信息的两个场景。 

场景一:当Producer尝试给一个不存在的主题发送消息时,Broker会告诉Producer说这个主题不存在。此时Producer会发送METADATA请求给Kafka集群,去尝试获取最新的元数据信息。

场景二:Producer通过metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是300000,即5分钟,也就是说不管集群那边是否有变化,Producer每5分钟都会强制刷新一次元数据以保证它是最及时的数据。

讲到这里,我们可以“挑战”一下社区对Producer的这种设计的合理性。目前来看,一个Producer默认会向集群的所有Broker都创建TCP连接,不管是否真的需要传输请求。这显然是没有必要的。再加上Kafka还支持强制将空闲的TCP连接资源关闭,这就更显得多此一举了。

试想一下,在一个有着1000台Broker的集群中,你的Producer可能只会与其中的3~5台Broker长期通信,但是Producer启动后依次创建与这1000台Broker的TCP连接。一段时间之后,大约有995个TCP连接又被强制关闭。这难道不是一种资源浪费吗?很显然,这里是有改善和优化的空间的。

 Kafka生产者客户端如何关闭TCP连接

Producer端关闭TCP连接的方式有两种:一种是用户主动关闭;一种是Kafka自动关闭

我们先说第一种。这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用kill -9主动“杀掉”Producer应用。当然最推荐的方式还是调用producer.close()方法来关闭

第二种是Kafka帮你关闭,这与Producer端参数connections.max.idle.ms的值有关。默认情况下该参数值是9分钟,即如果在9分钟内没有任何请求“流过”某个TCP连接,那么Kafka会主动帮你把该TCP连接关闭。用户可以在Producer端设置connections.max.idle.ms=-1禁掉这种机制。一旦被设置成-1,TCP连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于Kafka创建的这些Socket连接都开启了keepalive,因此keepalive探活机制还是会遵守的。

值得注意的是,在第二种方式中,TCP连接是在Broker端被关闭的,但其实这个TCP连接的发起方是客户端,因此在TCP看来,这属于被动关闭的场景,即passive close。被动关闭的后果就是会产生大量的CLOSE_WAIT连接,因此Producer端或Client端没有机会显式地观测到此连接已被中断。

总结: 

  1. KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有Broker的TCP连接。
  2. KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接。
  3. 如果Producer端发送消息到某台Broker时发现没有与该Broker的TCP连接,那么也会立即创建连接。
  4. 如果设置Producer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将无法被关闭,从而成为“僵尸”连接。

参考资料 

极客时间课程《Kafka核心技术与实战》

13.Java生产者是如何管理TCP连接的文章来源地址https://www.toymoban.com/news/detail-595775.html

到了这里,关于kafka生产端TCP连接管理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【网络】传输层——UDP | TCP(协议格式&&确认应答&&超时重传&&连接管理)

    🐱作者:一只大喵咪1201 🐱专栏:《网络》 🔥格言: 你只管努力,剩下的交给时间! 现在是传输层,在应用层中的报文(报头 + 有效载荷)就不能被叫做报文了,而是叫做 数据段 (报头 + 有效载荷),传输层的有效载荷就是应用层的完整报文。 端口号(port):标识了一个主机上

    2024年02月13日
    浏览(34)
  • 计算机网络:思科实验【8-运输层端口、TCP的运输连接管理、动态主机配置协议DHCP的作用】

    🌈个人主页:godspeed_lucip 🔥 系列专栏:Cisco Packet Tracer实验 本文对应的实验报告源文件请关注微信公众号 程序员刘同学 ,回复 思科 获取下载链接。 1 验证运输层 TCP / IP 端口号的作用 2 验证 TCP 使用三报文握手建立连接 3 验证 TCP 使用四报文挥手释放连接 Cisco Packet Tracer 模

    2024年03月13日
    浏览(45)
  • 【计算机网络】深入理解TCP协议二(连接管理机制、WAIT_TIME、滑动窗口、流量控制、拥塞控制)

    正常情况下,TCP需要经过三次握手建立连接+四次挥手断开链接,下面看一个图: 服务器的状态变化: [CLOSED - LISTEN] 服务器端调用listen后进入LISTEN状态, 等待客户端连接; [LISTEN - SYN_RCVD] 一旦监听到连接请求(同步报文段), 就将该连接放入内核等待队列中, 并向客户端发送SYN确认

    2024年02月07日
    浏览(44)
  • Linux telnet命令详解:通过TCP/IP网络连接与管理远程机器(附实例教程和注意事项)

    telnet 命令,全称为teletype network,是一个使用telnet网络协议来连接并管理远程机器的命令。它通过TCP/IP网络使用端口23来建立连接,并提供了一种使用命令行界面(CLI)管理远程系统的方式。虽然 telnet 与SSH相似,但两者有所不同,因为SSH使用了加密,而 telnet 则是以明文形式

    2024年02月04日
    浏览(36)
  • 网络编程——TCP/IP协议族(IP协议、TCP协议和UDP协议……)

    1、IP协议简介 IP协议又称 网际协议 特指为实现在一个相互连接的网络系统上从源地址到目的地传输数据包(互联网数据包)所提供必要功能的协议,是网络层中的协议。 2、特点 不可靠 :它不能保证IP数据包能成功地到达它的目的地,仅提供尽力而为的传输服务 无连接 :IP 并不

    2024年02月13日
    浏览(44)
  • 【网络协议】TCP/IP 协议

    1、TCP/IP 模型 TCP/IP 协议模型,包含了一系列构成互联网基础的网络协议,是 Internet 的核心协议。 基于 TCP/IP 协议栈可分为四层或五层,转换为 OSI 参考模型,可以分为七层,分别如下图所示: 通常我们所说的都是基于 TCP/TP 五层模型。 2、TCP/IP 协议栈每一层功能 应用层:H

    2024年02月12日
    浏览(35)
  • 网络:TCP/IP协议

    1. OSI七层参考模型        应用层         表示层         会话层         传输层         网络层         数据链路层         物理层 2. TCP/IP模型         应用层         传输层         网络层         数据链路层         物理层 3. 各链路层对应的名称    

    2024年02月15日
    浏览(51)
  • Java中网络的基本介绍。网络通信,网络,ip地址,域名,端口,网络通信协议,TCP/IP传输过程,网络通信协议模型,TCP协议,UDP协议

    - 网络通信 概念:网络通信是指 通过计算机网络进行信息传输的过程 ,包括数据传输、语音通话、视频会议等。在网络通信中,数据被分成一系列的数据包,并通过网络传输到目的地。在数据传输过程中,需要确保数据的完整性、准确性和安全性。常见的网络通信协议有T

    2024年02月10日
    浏览(53)
  • 【网络原理】TCP/IP协议

    目录 1.应用层 2.传输层(核心问题) 2.1 UDP协议 2.1.2 UDP的特点 2.1.3 基于UDP的应用层协议 2.2 TCP协议(重点内容) 2.2.1 TCP/IP 协议含义 2.2.2 TCP协议端格式: 2.2.3 TCP的特点 2.3 TCP原理 2.4 确认应答机制(安全机制) 2.5 超时重传机制(安全机制) 2.5.1 数据直接丢了,接收方没

    2023年04月13日
    浏览(36)
  • TCP协议IP网络音柱

    SV-704CT TCP协议I P网络音柱 一、描述  SV-704CT是深圳锐科达电子有限公司的一款壁挂式 IP网络有源音柱 ,具有10/100M以太网接口,可将网络音源通过自带的功放和喇叭输出播放,其采用防水设计,功率可以从60W到120W。SV-704CT作为网络广播播放系统的终端,可用于需要广播播放的

    2024年02月07日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包