Kafka 入门到起飞系列 - 生产者发送消息流程解析

这篇具有很好参考价值的文章主要介绍了Kafka 入门到起飞系列 - 生产者发送消息流程解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka 入门到起飞系列 - 生产者发送消息流程解析,kafka,kafka,生产者,消息有序性,分区策略,消息重试

  • 生产者通过producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等

  • 生产者通过send()方法发送消息,send()方法会经过如下几步
    1. 首先将消息交给拦截器(Interceptor)处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的效果,拦截器一般将一些通用的功能加进来,通常在消息发送前,producer回调逻辑前对消息做一些定制化需求,消息头部添加消息的属性等
    2. 接下来交给序列化器(Serializer),Key的序列化器和value的序列化器,对消息的key和value进行序列化,序列化为字节数组,
    3. 然后将序列化的结果交给分区器(Partitioner),分区器有3种策略来计算消息应该属于哪个分区,

    • 在producerRecord中直接指定分区,分区器会直接将消息放到指定分区

    • 如果没有指定分区器,但是消息有key,分区器会根据消息的key计算hash值,根据主题分区数量取模,来决定将消息放到哪个分区

    • 如果没有指定分区、也没有指定key,分区器会以轮询(Round Robin)的方式给消息分配分区

      在这里插入图片描述

  • 消息经过以上拦截器->序列化器->分区器 进行加工后,会将消息放到RecordAccumulator缓冲区,对每个分区都会有一个单独的缓冲区,经过分区器计算出分区号之后,不同的消息就会分配给不同的缓冲区,缓冲区里面消息也是有序的,我们可以指定对缓冲区里的消息进行分批次,也可以指定缓冲区大小

  • Kafka 入门到起飞系列 - 生产者发送消息流程解析,kafka,kafka,生产者,消息有序性,分区策略,消息重试

  • 当缓冲区中消息达到条件会按批次发送到broker对应分区上

  • broker将接收到的消息进行刷盘持久化

  • 一个消息发出去之后,服务器(broker)会返回给producer响应,producer再来判断消息是否发送成功,

  • broker返回元数据信息 - > 落盘成功 ->生产者继续发送后面消息

  • broker返回元数据信息 - >落盘失败 - 生产者设置了重试次数 -> producer 会将消息重新放入缓冲区进行排队,等待再次发送,当一个消息发送失败重试需要重发,消息是放到缓冲区队尾,

  • 生产者去缓冲区重试发送


生产者在重试消息时,消息的顺序就错了,那怎么保证消息的有序性呢?

Kafka 入门到起飞系列 - 生产者发送消息流程解析,kafka,kafka,生产者,消息有序性,分区策略,消息重试

针对这种情况,可以做一个配置,
参数:max.in.flight.requests.per.connection表示producer 在收到broker响应之前可以发送多少批消息,默认5,
设置此值是1,表示broker在响应之前producer不能再向同一个broker发送请求,就是我确认一批你再发下一批,这样可以保证消息有序性,对消息顺序要求不高情况可以不考虑


补充:

  • Producer 创建时,会创建一个Sender线程(IO线程)设置为守护线程

  • Producer 创建时,会创建缓冲区

  • Producer 生产消息,内部是一个异步流程,Sender线程不断轮询RecordAccumulator,满足条件后进行真正的网络IO发送消息

  • Kafka 入门到起飞系列 - 生产者发送消息流程解析,kafka,kafka,生产者,消息有序性,分区策略,消息重试

  • RecordAccumulator(缓冲区) 对每一个分区都有一个缓冲区

    • 每个分区的缓冲区中消息也是有序的
    • 可以指定缓冲区中的消息按批次发送
      • 缓冲区大小达到batch.size,默认16KB
      • 在缓冲区等待时间 lingger.ms 达到上限
      • 以上两个条件满足一个即发送一批
    • 可以指定整个缓冲区的大小

批次的概念很好理解,缓冲区就像一辆公交车,有两种发车方式,一是人满了就发车,一是等5分钟就发车,不管是人满了还是到5分钟了,发车,go~
分批发送可以减少网络IO,节省带宽使用,减少网络传输的压力,提升吞吐量文章来源地址https://www.toymoban.com/news/detail-595877.html

  • 一个批次消息发送后,通过网络,发往Kafka指定分区,然后刷盘到broker
  • 如果Producer设置了retries参数值>0,那么允许消息发送失败进行重试,重试机制由客户端Producer内部实现
  • Broker端消息落盘成功,会返回元数据给生产者
    • 通过阻塞直接返回 (同步发送)
    • 通过回调函数返回(异步发送)

到了这里,关于Kafka 入门到起飞系列 - 生产者发送消息流程解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(37)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(36)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(36)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(30)
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(68)
  • Kafka中的生产者如何处理消息发送失败的情况?

    在Kafka中,生产者可以通过以下方式处理消息发送失败的情况: 同步发送模式(Sync Mode):在同步发送模式下,生产者发送消息后会阻塞等待服务器的响应。如果发送失败,生产者会抛出异常(例如 ProducerRecord 发送异常)或返回错误信息。开发者可以捕获异常并根据需要进行

    2024年02月06日
    浏览(31)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(35)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(37)
  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    报这个错误是因为kafka里的配置要修改下 在config目录下 server.properties配置文件 这下发送消息就不会一直等待,就可以发送成功了

    2024年02月06日
    浏览(26)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包