Kafka-消费者-传递保证语义(Delivery guarantee semantic)

这篇具有很好参考价值的文章主要介绍了Kafka-消费者-传递保证语义(Delivery guarantee semantic)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka服务端并不会记录消费者的消费位置,而是由消费者自己决定如何保存如何记录其消费的offset。

在Kafka服务端中添加了一个名为“__consumer_offsets”的内部Topic,为了便于描述简称“Offsets Topic”。

Offsets Topic可以用来保存消费者提交的offset,当出现消费者上/下线时会触发Consumer Group进行Rebalance操作,对分区进行重新分配,待Rebalance操作完成后,消费者就可以读取Offsets Topic中记录的offset,并从此offset位置继续消费。

当然,使用Offsets Topic记录消费者的offset只是默认选项,开发人员可以根据业务需求将offset记录在别的存储中。

在消费者消费消息的过程中,提交offset的时机显得非常重要,因为它决定了消费者故障重启后的消费位置。

我们通过将enable.auto.commit选项设置为true可以起到自动提交offset的功能,auto.commit.interval.ms选项则设置了自动提交的时间间隔,这是最简单的提交offset方式。

每次在调用KafkaConsumer.poll方法时都会检测是否需要自动提交,并提交上次poll方法返回的最后一个消息的offset。

为了避免消息丢失,建议poll方法之前要处理完上次poll方法拉取的全部消息。

KafkaConsumer中还提供了两个手动提交offset的方法,分别是commitSync()方法和commitAsync()方法,它们都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。提交offset的具体原理和实现稍后分析。

下面来介绍消息的传递保证(Delivery guarantee semantic)的相关内容,传递保证语义有以下三个级别。

  • At most once:消息可能会丢,但绝不会重复传递。
  • At least once:消息绝不会丢,但可能会重复传递。
  • Exactly once:每条消息只会被传递一次。

在实践中很少出现对“At most once”的需求,而在很多场景中,“Exactly once”语义才是我们需要的,所以我们详述这种语义。

当然,如果通过Kafka传递的消息是幂等性的(即一条消息被反复消费多次并不会对计算结果产生影响),使用“At least once”语义也是没有问题的。

“Exactly once”语义由生产者和消费者两部分共同决定:首先,生产者要保证不会产生重复的消息;其次,消费者不能重复拉取相同的消息。

先来讨论生产者部分,当生产者向Kafka发送消息,且正常得到响应的时候,可以确保生产者不会产生重复的消息。

但是,如果生产者发送消息后,遇到网络问题,无法获取响应,生产者就无法判断该消息是否成功提交给了Kafka。

我们知道,当出现异常时,会进行消息重传,这就可能出现“At least one”语义。

为了实现“Exactly once”语义,这里提供两个可选方案:

  • 每个分区只有一个生产者写入消息,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作是消息重传还是继续发送。
  • 为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析方式进行重传,由消费者对消息进行去重,实现“Exactly once”语义。

如果业务数据产生消息可以找到合适的字段作为主键,或是有一个全局ID生成器,可以优先考虑选用第二种方案。
接下来讨论消费者部分。消费者处理消息与提交offset的顺序,在很大程度上决定了消息者是哪个语义。

Kafka-消费者-传递保证语义(Delivery guarantee semantic),队列,kafka,数据库,分布式Kafka-消费者-传递保证语义(Delivery guarantee semantic),队列,kafka,数据库,分布式

在图中展示了两种提交offset不当导致的消息被重复消费(“At least once”语义)以及丢失消息(“At most once”语义)的情况。

在图1的场景中,消费者拉取完消息后,业务逻辑先对消息进行处理,再提交offset。

这种模式下,如果消费者在处理完了消息之后,提交offset之前出现宕机,待消费者重新上线时,还会处理刚刚未提交的那部分消息(即2~7这部分消息),但这些消息已经被处理过了,这就对应于“At least once”语义。

在图2的场景中,消费者拉取消息后,先提交offset后再处理消息。

在提交offset之后,业务逻辑处理消息之前出现宕机,待消费者重新上线时,就无法读到刚刚已提交而未处理的这部分消息(即5~8这部分消息),这就对应于“At most once”语义。

这里仅仅是一个示例,还有很多原因会导致类似的结果,例如Consumer Group Rebalance等。

为了实现消费者的“Exactly once”语义,在这里提供一种方案,供读者参考:

消费者将关闭自动提交offset的功能且不再手动提交offset,这样就不使用Offsets Topic这个内部Topic记录其offset,而是由消费者自己保存offset。

这里利用事务的原子性来实现“Exactlyonce”语义,我们将offset和消息处理结果放在一个事务中,事务执行成功则认为此消息被消费,否则事务回滚需要重新消费。

当出现消费者宕机重启或Rebalance操作时,消费者可以从关系型数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手动设置消费位置,从此offset处开始继续消费。

到目前为止,消费者并不知道Consumer Group什么时候会发生Rebalance操作,哪个分区分配给了哪个消费者消费。我们可以通过向KafkaConsumer添加ConsumerRebalanceListener接口来解决这个问题。

ConsumerRebalanceListener有两个回调方法。

  • onPartitionsRevoked方法:调用时机是Consumer停止拉取数据之后、Rebalance开始之前,我们可以在此方法中实现手动提交offset,这就避免了Rebalance导致的重复消费的情况。
  • onPartitionsAssigned方法:调用时机是Rebalance完成之后、Consumer开始拉取数据之前,我们可以在此方法中调整或自定义offset的值。

通过ConsumerRebalanceListener接口和seek方法,我们就可以实现从关系型数据库获取offset并手动设置的功能了。

这个方案还有其他的变体,例如,使用assign方法为消费者手动分配TopicPartition,将提供事务保证的存储换成HDFS或其他No-SQL数据库,但是基本原理还是不变的。文章来源地址https://www.toymoban.com/news/detail-803240.html

到了这里,关于Kafka-消费者-传递保证语义(Delivery guarantee semantic)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(7)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(54)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(7)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(5)
  • Kafka消费者不消费数据

    Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(8)
  • Kafka-消费者组消费流程

    Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(11)
  • Kafka3.0.0版本——消费者(消费者组原理)

    Kafka3.0.0版本——消费者(消费者组原理)

    1.1、消费者组概述 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 注意: (1)、消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 (2)、消费者组之间互不影响。所有的消费者

    2024年02月09日
    浏览(11)
  • 【Kafka】Kafka消费者

    【Kafka】Kafka消费者

    pull(拉)模式:consumer采用从broker中主动拉取数据。 Kafka采用这种方式。 push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有的消费者的消费速率。例如推送的速度是50m/s,consumer1和consumer2旧来不及处理消息。 pull模式不足之处是,如果Kafka没有数

    2024年02月13日
    浏览(8)
  • Kafka消费者无法消费数据,解决

    作为一个在项目中边学边用的实习生,真的被昨天还好好的今天就不能消费数据的kafka折磨到了,下面提供一点建议,希望能对大家有所帮助。 //操作前集群都关了 1.首先去kafka-home的config目录下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092    如果有配置liste

    2024年02月17日
    浏览(7)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包