Kafka系列 - 生产者客户端架构以及3个重要参数

这篇具有很好参考价值的文章主要介绍了Kafka系列 - 生产者客户端架构以及3个重要参数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

整体架构

Kafka系列 - 生产者客户端架构以及3个重要参数,Kafka庖丁解牛,kafka,架构,分布式

整个生产者客户端由两个县城协调运行,这两个线程分别为主线程和Sender线程(发送线程)。

主线程中由KafkaProducer创建消息,然后通过可能的拦截器,序列化器和分区器之后缓存到消息累加器(RecordAccumulator)。Sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区, Deque< ProducerBatch>> 的保存形式转变成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成 <Node, List> 的形式之后,Sender 还会进一步封装成 <Node, Request> 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId, Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

前面提及的 InFlightRequests 还可以获得 leastLoadedNode,即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于下图中的 InFlightRequests 来说,图中展示了三个节点 Node0、Node1和Node2,很明显 Node1 的负载最小。也就是说,Node1 为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。

Kafka系列 - 生产者客户端架构以及3个重要参数,Kafka庖丁解牛,kafka,架构,分布式

生产者重要参数

acks

  • acks=1。(默认值):生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为1,是消息可靠性和吞吐量之间的折中方案。
  • acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为0可以达到最大的吞吐量。
  • acks=-1或acks=all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为 -1(all) 可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有 leader 副本,这样就退化成了 acks=1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动

retries和retry.backoff.ms

retries 参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。

重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。

对于某些应用来说,顺序性非常重要,比如 MySQL 的 binlog 传输,如果出现错误就会造成非常严重的后果。如果将 retries 参数配置为非零值,并且 max.in.flight.requests.per.connection 参数配置为大于1的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为1,而不是把 retries 配置为0,不过这样也会影响整体的吞吐。文章来源地址https://www.toymoban.com/news/detail-757539.html

到了这里,关于Kafka系列 - 生产者客户端架构以及3个重要参数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(60)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(47)
  • kafka学习-生产者

    目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 在Kafka中保存的数据都是字节数组。 消息发送前,需要将消息序列化为字节数组进行发送。

    2024年02月09日
    浏览(39)
  • (三)Kafka 生产者

    创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基

    2024年02月09日
    浏览(38)
  • Kafka 生产者

    目录 一、kafka生产者原理 二、kafka异步发送 配置kafka 创建对象,发送数据 带回调函数的异步发送 同步发送   三、kafka生产者分区 分区策略 指定分区:  指定key: 什么都不指定: 自定义分区器 四、生产者提高吞吐量 五、数据的可靠性 ACK应答级别 数据完全可靠条件 可靠性

    2023年04月15日
    浏览(43)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(39)
  • Kafka-生产者

    Kafka在实际应用中,经常被用作高性能、可扩展的消息中间件。 Kafka自定义了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息。 在实践生产过程中,一套API封装良好、灵活易用的客户端可以避免开发人员重复劳动,提高开发效率,也

    2024年01月20日
    浏览(34)
  • Kafka(生产者)

    目 前 企 业 中 比 较 常 见 的 消 息 队 列 产 品 主 要 有 Kafka(在大数据场景主要采用 Kafka 作为消息队列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 传统消息队列的应用场景 传统的消息队列的主要应用场景包括: 缓存/消峰 、 解耦 和 异步通信 。 缓冲/消峰: 有助于控制和优化数据流经过

    2024年02月11日
    浏览(44)
  • 三、Kafka生产者

    1 发送原理 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 【RecordAccumulator缓冲的结构: 每一个分区对应一

    2024年02月12日
    浏览(37)
  • Kafka生产者

    1.acks 如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。 缺点:如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息就丢失了 优点:因为生产者不需要等待服务器的响应,所有他可以以网络能够支持的最大速度发送消息,从而

    2024年01月19日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包