Kafka核心原理之精准一次性投递

这篇具有很好参考价值的文章主要介绍了Kafka核心原理之精准一次性投递。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

精准一次性投递

在Kafka中,精准一次性投递(Exactly Once)=至少投递一次(At Least Once)+幂等性。

至少投递一次(At Least Once):将生产端参数acks设置为-1(all),可以保证生产端发送到Broker的消息不会丢失,即:至少投递一次(At Least Once)。

幂等性:

  • 幂等生产者幂保证单分区单会话内精准一次性投递消息。
  • 事务生产者保证跨分区跨会话精准一次性投递消息。

幂等生产者

幂等生产者指的是当发送同一条消息时,消息在Broker端只会被持久化一次,消息不丢失也不重复,它在底层设计架构中引入了ProducerID和SequenceNumber:

  • ProducerID:每个生产者启动时,Kafka会为生产者分配一个唯一标识ProducerID(注意:生产者重启时会重新分配ProducerID);
  • SequenceNumber :针对每个生产者(即:每个ProducerID),发送到指定Topic中某个分区内的消息都会为其生成一个从0开始单调递增的SequenceNumber值。

幂等性配置:生产端设置参数enable.idempotence为true。

注意:开启幂等性后,retries(重试次数)配置将默认为Integer.MAX_VALUE,acks配置将默认为-1,如果此时手动将acks设置为0,会报错。

幂等生产者实现原理

生产者在初始化时会被分配一个唯一的ProducerID(凡是开启幂等性生产者都需要生成全局唯一的ProducerID,如开启事务,则由TransactionCoordinator节点生成ProducerID,未开启事务,则由Broker自身生成ProducerID),该ProducerID对用户完全透明。

Leader所在的Broker会按<ProducerID, Topic, Partition>维度缓存SequenceNumber,对于接收的每条消息:

  • 如果消息序列号比Broker缓存中的序列号大1,则接受该消息,否则丢弃该消息。
  • 如果消息序列号比Broker缓存中的序列号大1以上,说明中间有数据尚未写入(即:乱序),此时Broker拒绝该消息。
  • 如果消息序列号小于等于Broker缓存中的序列号,说明该消息已被保存(即:重复消息),Broker丢弃该消息。

注意:生产端是以批次的方式发送消息,每个批次包含了多条消息,因此,生产端发送消息时,只会设置该批次第一条消息的序列号,该批次中其他消息的序列号是根据第一条消息的序列号计算得来。

ProducerID生产机制:

  • ProducerID通过ProducerIdManager的generateProducerId()方法获取,每个Broker都会在本地维护一个ProducerID段,当本地的ProducerID段用完后,会去Zookeeper上申请(节点:/latest_producer_id_block,每次申请1000个)。
  • 当Broker向Zookeeper申请ProducerID段时,会先获取Zookeeper中保存的当前ProducerID段最大值,然后尝试将要申请的ProducerID段写回ZK,这样其他Broker在尝试申请时,不会出现ProducerID段重复的情况。
  • 若开启事务,则由TransactionCoordinator节点生成全局唯一的ProducerID。

重新选举Leader后生产者幂等性是否有效?

有效。因为Leader分区所在的Broker缓存了PID、SequenceNumber等信息,而Leader分区中所有消息都会被复制到Followers中,当ISR中某个Follower被选举为新的Leader时,新Leader内部的消息中已经存储了PID、SequenceNumber等信息信息,因此,新Leader接收生产者消息时仍能实现消息幂等。

事务生产者

Kafka的幂等生产者只能保证单分区单会话内精准一次性投递消息,不能解决跨会话跨分区的问题。Kafka0.11版本开始引入了事务生产者,事务生产者保证Kafka在幂等生产者的基础上,实现跨分区跨会话精准一次性投递消息。

事务场景

事务生产者使用场景:

  • Producer发送多条消息组成一个事务,这些消息需要对Consumer同时可见或者同时不可见 。
  • Producer可能会给多个Topic、多个Partition发消息,这些消息需要能放在一个事务中,这是典型的分布式事务场景。
  • Kafka的应用场景经常是需要先消费一个Topic,处理后再发到另一个Topic,这个Consume-Transform-Produce过程需要放到一个事务中,比如:在消息处理或者发送的过程中失败,消费偏移量不能提交。
  • Producer或者Producer所在的应用可能会挂掉,新的Producer启动以后需要处理之前未完成的事务 。
  • 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致RT非常长吞吐量也随之下降很多,所以需要实现Read Committed和Read Uncommitted两种事务隔离级别。

注意:当事务中仅存在Consumer消费消息的操作时,消费事务消息和Consumer手动提交Offset并没有区别。因此单纯的消费消息并不是Kafka引入事务机制的原因,单纯的消费消息也没有必要存在于一个事务中。

事务生产者实现原理

Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator,主要负责为生产者分配PID,记录事务状态等操作。

如图所示:

kafka at least once,kafka,分布式

生产者事务执行流程:

  • 1)初始化事务:生产者向事务协调器发送InitProducerIdRequest请求,为生产者生成对应的PID。
  • 2)开启事务:生产者调用KafkaProducer#beginTransaction()方法开启事务。
  • 3)发送事务消息:
    • 生产者发送消息前,向事务协调器发送AddPartitionsToTxnRequest请求,增量记录事务关联的TopicPartition列表。
    • 生产者向Broker发送事务消息。
    • 生产者向事务协调器发送AddOffsetsToTxnRequest请求,增量记录Offsets提交前,Group Coordinator(消费者组协调器)对应的TopicPartition列表。
    • 生产者向消费者组协调器发送TxnOffsetCommitRequest请求,将生产者提交的Offsets放置在事务性待确认列表。
  • 4)提交或回滚事务:生产者向事务协调器发送EndTxnRequest请求:
    • 将Prepare消息写入 __transaction_state。
    • 事务协调器向事务关联的TopicPartitions主副本和__transaction_state中写入EndTransactionMarker标记。
  • 5)超时事务中止:事务协调器以10s/次的频率轮询检测进行中的事务是否已超时,若事务已超时,则推进Epoch、中止当前事务。后续再次收到老的生产者(Epoch小于当前Epoch)消息将抛出ProducerFencedException异常。

开启事务配置

对于生产者,通过KafkaProducer的beginTransaction()方法可以开启一个事务,调用该方法后,生产者本地会标记已经开启了一个新的事务,只有在生产者发送第一条消息之后TransactionCoordinator才会认为该事务已经开启。同时,需要设置transactional.id属性,设置了transactional.id属性后,enable.idempotence属性会自动设置为true。

对于消费者,需要设置isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外,需要设置enable.auto.commit = false来关闭自动提交Offset功能。

生产者事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的TransactionID,并将生产者的PID与TransactionID绑定,这样当生产者重启后,就可以通过生产者绑定的TransactionID获得原来的PID。

为了管理事务,Kafka引入了一个新的组件Transaction Coordinator(事务协调器,事务协调器为Leader分区所在的Broker节点),生产者就是通过与Transaction Coordinator交互获得TransactionID对应的任务状态,Transaction Coordinator负责将事务操作写入Kafka的一个内部Topic(__transaction_state(默认分区数:50)),这样即使Transaction Coordinator宕机重启,由于事务状态得到保存,未完成的事务状态可以得到恢复。

消费者事务

对于消费者而言,事务能保证的语义相对较弱,无法保证已提交事务中的消息都能被消费。原因如下:

  • 对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同Key的消息,后写入的消息会覆盖前面写入的消息)。
  • 事务中的消息可能分布在同一个分区的多个日志分段(Segment)中,当旧的日志分段被删除时,对应的消息可能会丢失。
  • 消费者可以通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
  • 消费者在消费时可能没有分配到事务内的所有分区,因此不能读取事务中的所有消息。

消费端有一个isolation.level参数,该参数用来配置消费者的事务隔离级别(类型为字符串),有效值为read_uncommitted和read_committed,表示消费者消费到的位置,如果设置为read_committed,那么消费者会忽略事务未提交的消息,即:只能消费到LSO(Last Stable Offset,即:最后稳定偏移)的位置。默认值为read_uncommitted,即:可以消费到HW处的位置,注意:Follower副本的事务隔离级别为read_uncommitted,且不可修改。

在开启Kafka事务时,生产者发送了message1、message2、message3、message4四条消息到Broker中,如果生产者没有提交事务,对于read_committed隔离级别的消费者而言,是看不到这些消息的,设置为read_uncommitted隔离级别则可以看到。事务中第一条消息的位置标记为firstUnstableOffset(即:message1的位置)。

LSO会影响Kafka消息滞后量(Kafka Lag,即:消息堆积量)的计算。如图所示:

kafka at least once,kafka,分布式

图中:在read_uncommitted(默认)隔离级别下,Consumer Offset表示当前的消费位置(针对普通消息的情况),Kafka Lag = HW - Consumer Offset,即:Kafka Lag计算方式不受影响(即使引入事务消息)。在read_committed隔离级别下,如果引入事务消息,Kafka Lag计算需要引入LSO来进行计算。如图所示:

kafka at least once,kafka,分布式

图中:对于未完成的事务,LSO的值等于事务中第一条消息的位置(即:firstUnstableOffset),对于已完成的事务,LSO的值与HW相同,因此,LSO<=HW<=LEO。如下图所示:

kafka at least once,kafka,分布式

图中:在read_committed隔离级别下,Kafka Lag = LSO - Consumer Offset。注:ControlBatch为控制消息,控制消息一共有两种类型:COMMIT和ABORT,分别用来表示事务已经成功提交或已经被成功终止。KafkaConsumer可以通过控制消息来判断对应的事务已提交或已中止,再结合参数isolation.level配置的隔离级别来决定是否将相应的消息返回给消费端。

事务恢复保证

为了实现有状态的应用也可以保证重启后从断点处继续处理(即:事务恢复)。应用程序必须提供一个稳定的(重启后不变)唯一的ID(即:Transaction ID)。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。

另外,为了保证新的生产者启动后,旧的具有相同Transaction ID的生产者失效,生产者通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的生产者的epoch比新生产者的epoch小,Broker会拒绝其请求。

有了Transaction ID和epoch后,Kafka可保证:

  • 跨Session的数据幂等发送。当具有相同Transaction ID的新的生产者实例被创建且工作时,旧的且拥有相同Transaction ID的生产者将不再工作。
  • 跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。

事务原子性保证

事务原子性是指生产者将多条消息作为一个事务批量发送,要么全部成功要么全部失败。 引入了一个服务器端的模块,名为Transaction Coordinator,用于管理生产者发送的消息的事务性。

该Transaction Coordinator维护Transaction Log,该Log存于一个内部的Topic内(__transaction_state(默认分区数:50))。由于Topic数据具有持久性,因此事务的状态也具有持久性。

生产者并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。

Transaction Log的设计与Offset Log用于保存Consumer的Offset类似。

事务中Offset提交保证

在Kafka Stream应用中同时包含Consumer和Producer(即Consumer-Transform-Producer),前者负责从Kafka中获取消息,后者负责将处理完的数据写回Kafka的其它Topic中。

为了实现该场景下的事务原子性,Kafka需要保证对Consumer Offset的Commit与Producer对发送消息的Commit包含在同一个事务中。否则,如果在二者Commit中间发生异常,根据二者Commit的顺序可能会造成数据丢失和数据重复:

  • 如果先Commit Producer发送数据的事务,再Commit Consumer的Offset,即:At Least Once语义,可能造成数据重复。
  • 如果先Commit Consumer的Offset,再Commit Producer数据发送事务,即At Most Once语义,可能造成数据丢失。

拒绝僵尸实例(Zombie fencing)

在分布式系统中,一个实例宕机或失联,集群会启动一个新的实例来代替该实例。此时若旧实例恢复,集群中就会存在两个相同的实例(即:TransactionID和PID相同的实例),此时旧实例就被称为“僵尸实例”。

在Kafka中,两个相同的生产者实例同时处理消息并生产重复消息,这就是僵尸实例问题。

Kafka事务特性通过TransactionID属性来解决僵尸实例问题。所有具有相同TransactionID的生产者会分配相同的PID,同时,每个生产者还会分配一个递增的epoch(纪元)。Broker收到事务提交请求时,如果检查当前事务提交者的epoch不是最新值,就会拒绝该生产者的请求,从而达成拒绝僵尸实例的目标。

用于事务特性的控制型消息

为了区分写入Partition的消息被Commit还是Abort,Kafka引入了一种特殊类型的消息(即:Control Message)。该类消息的Value内不包含任何应用相关的数据,并且不会暴露给应用程序。它只用于Broker与Client间的内部通信。

对于Producer端事务,Kafka以Control Message的形式引入一系列的Transaction Marker。Consumer即可通过该标记判定对应的消息被Commit了还是Abort了,然后结合该Consumer配置的隔离级别决定是否应该将该消息返回给应用程序。

注意:

Kafka事务的回滚,并不是删除已写入的数据,而是将写入数据的事务标记为Rollback/Abort,从而在读数据时过滤该数据。文章来源地址https://www.toymoban.com/news/detail-858049.html

到了这里,关于Kafka核心原理之精准一次性投递的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python 一次性删除列表(list)的空白元素(空内容) 或者 一次性删除列表(list)中的指定元素

    看看下述代码: 输出: 当你遇见这种情况,有哪些方法来去除里面的空内容呢(即 \\\'\\\' )? 1.1 删除空内容(方法一) : 输出: 1.2 删除空内容(方法二) : 需要 配合 lambda 表达式 一起使用! 输出: 2.3 删除指定内容 : 输出: 注 :此方法既可以删除空元素,也可以删除指

    2024年02月03日
    浏览(45)
  • 《一次性分割一切》阅读笔记

    目录 0 体验 1 摘要 2 十个问题 参考文献 体验地址 :SEEM - a Hugging Face Space by xdecoder 体验结果 : 将哈士奇和汽车人从图片中分割出来。 尽管对于交互式人工智能系统的需求不断增长,但在视觉理解(例如分割)中的人工智能交互方面,很少有全面的研究。本文受到基于提示的

    2024年02月01日
    浏览(50)
  • 公众号一次性订阅消息

    洛塔服务号回复007获取代码。 之前发布通知,要用订阅通知替代一次性订阅消息,不知道是被骂的太惨还是技术原因,一次性订阅消息还是一直能用。 和模板消息不同的是,一次性订阅消息无需用户关注公众号,但是必须用户点击同意发送才能接收消息。 模板消息:需要关

    2024年02月09日
    浏览(49)
  • 一次性打包学透 Spring

    不知从何时开始,Spring 这个词开始频繁地出现在 Java 服务端开发者的日常工作中,很多 Java 开发者从工作的第一天开始就在使用 Spring Framework,甚至有人调侃“不会 Spring 都不好意思自称是个 Java 开发者”。 之所以出现这种局面,源于 Spring 是一个极为优秀的一站式集成框架

    2023年04月19日
    浏览(41)
  • Python:一次性输出多个量

    有的时候我们在输入一个字符串时,需要在中间加一个int类型变量时,如果一段一段输出就要写三个print,非常麻烦。今天bug君就给大家讲讲如何在Python里一次性输出多个量。 粽所粥汁,在Python里输出需要写 print(\\\"输出内容\\\") ,而输出一个变量则需要写 print(变量名) 。 注意:

    2024年02月04日
    浏览(91)
  • flink实现kafka、doris精准一次说明

    前言说明:本文档只讨论数据源为kafka的情况实现kafka和doris的精准一次写入 flink的kafka连接器已经实现了自动提交偏移量到kafka,当flink中的数据写入成功后,flink会将这批次数据的offset提交到kafka,程序重启时,kafka中记录了当前groupId消费的offset位置,开始消费时将会从上一次

    2024年02月08日
    浏览(33)
  • charles证书安装,一次性说明白

    windows上安装好charles后,需要给软件安装证书。 1、点击help - SSL proxying,选择第二个install Charles Root Certificate证书安装   2、如果以前安装过证书,但是过期了(有效期一般1年),证书界面会显示过期字样,此时就要先点击一下Reset Charles Root Certificate,然后再点击第一步中的

    2024年02月05日
    浏览(84)
  • 如何一次性启动多个SpringBoot项目

    在做微服务这块的架构设计的时候,当微服务数量越来越多的时候,本地启动各个服务的时候,可能得手动启动每个启动类。这样就很麻烦,因此记录一下如何在 idea 里面一键启动所有的项目。 比如我项目里面有5个微服务:那么就对应了5个启动类。 1.项目右上角编辑: 2.点

    2024年02月16日
    浏览(35)
  • uniapp上传图片、一次性上传多张图片

    uniapp官方文档 上传多张图片 上传单张

    2024年02月12日
    浏览(48)
  • Git仓库实现:一次性提交多个文件

    首先是建立服务器与仓库的连接,不会操作的小伙伴可以看这一篇文章,链接: Git仓库 保姆级教程 选择文件夹,使用git clone上传的仓库,点击进去 输入命令行: git commit -m \\\" \\\" ,双引号一般填入描述或者版本信息,但是若要提交多个文件,则写入 \\\"add more files\\\" 有问题欢迎随时

    2024年02月12日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包