汇总Kafka手动提交与自动提交

这篇具有很好参考价值的文章主要介绍了汇总Kafka手动提交与自动提交。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

自动提交

程序拉取消息后,满足要求后自动提交,无需程序开发者介入。

  1. 配置

@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        //不自动启动
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //rebalance监听
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;

    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //设置自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交时间间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //重置位移 从最新的或最旧的消息开始消费
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }

2.重复消费与丢消息的情景

2.1丢消息

在consumer拉取消息之后,达到提交时间AUTO_COMMIT_INTERBAL_MS_CONFIG之后位移(例如1-10)已经自动提交,若此时消息尚未消费完成时,且消费者挂掉了,此时尚未消费的消息会丢失。因为位移1-10已经提交,下次拉取消息会从位移10开始消费。注:提交的位移是下次消费的起点。

2.2重复消费

此情况多发生于AUTO_COMMIT_INTERBAL_MS_CONFIG时间配置过长的情况下。在consumer拉取消息之后,消费者已经完成消费,但此时还未达到AUTO_COMMIT_INTERBAL_MS_CONFIG,位移尚未提交,但消费者挂掉了。下次会重新拉取此批消息,重新处理,导致重复消费。

手动提交

手动提交,程序设计者自行控制提交位移的时间。可由spring自动提交或主动调用提交位移的方法主动提交。

1.配置

@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //手动提交方式 手动调用api接口后马上提交
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //手动同步提交
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;

    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //自动提交设置为false
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }

2.手动提交方式及参数介绍

2.1同步手动提交

同步阻塞,提交函数有返回后继续执行;失败自动重试;

2.2异步手动提交

异步提交无需等待提交方法的返回;无失败重试机制。

原因:位移异步提交,可能出现位移2提交失败,位移3提交成功,若位移2尝试重新提交并成功了,会把位移从3更新成2,导致下次重新拉取消息3造成重复消费。

2.3手动提交参数

spring kafka:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 将自动提交设置为false;

  1. factory.getContainerProperties().setSyncCommits(true); 设置手动同步提交,或异步提交;

  1. factory.getContainerProperties().setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

ackMode参数:

* RECORD:当消息处理器返回处理结果后立即提交offset。
* BATCH:当poll()方法返回的所有消息都被处理后提交offset。(默认值)
* TIME:当poll()方法返回的所有消息都被处理且距离上一次提交offset的时间已经超过了ackTime设置的时间,则提交offset。
* COUNT:当poll()方法返回的所有消息都被处理且自上一次提交偏移量以来接收到了ackCount条记录,则提交offset。
* COUNT_TIME:与TIME和COUNT类似,但如果两种情况中有任意一种满足,则提交offset。
* MANUAL:消息监听器负责手动提交Acknowledgment对象。此后,与BATCH相同的语义将被应用。
* MANUAL_IMMEDIATE:当消息监听器调用Acknowledgment.acknowledge()方法时立即提交offset;

3.重复消费的情景

3.1重复消费

在消费者消费完消息1-消息10后,尚未调用提交位移的方法时,消费者挂掉了。当rebalance之后,此分区分配给新的消费者,会重复拉取消息1-消息10进行消费,从而导致重复消费问题。文章来源地址https://www.toymoban.com/news/detail-523774.html

到了这里,关于汇总Kafka手动提交与自动提交的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka为什么尽量使用手动提交

    在 Kafka 中,消费者可以使用手动提交和自动提交两种方式来管理消费偏移量(offset)。它们之间的区别如下: 1. 手动提交 offset:    - 消费者通过调用 `commitSync()` 或 `commitAsync()` 方法手动提交消费偏移量。    - 手动提交 offset 需要显式地指定要提交的分区和偏移量。    

    2024年02月15日
    浏览(24)
  • Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

    虽然offset十分遍历,但是由于其是基于时间提交的,开发人员难以把握offset提交的实际。因此Kafka还提供了手动提交offset的API 手动提交offset的方法有两种:分别commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交:不同点是

    2024年02月11日
    浏览(35)
  • Kafka3.0.0版本——消费者(手动提交offset)

    1.1、手动提交offset的两种方式 commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。 1.2、手动提交offset两种方式的区别 相同点:都会将本次提交的一批数据最高的偏移量提交。 不

    2024年02月09日
    浏览(36)
  • 为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

    消费概念: Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。 偏移量 : 每当我们启动一个消费者时,

    2024年02月12日
    浏览(44)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(37)
  • Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

    Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置): single - 每次消费单条记录 batch - 批量消费消息列表 且每种模式都分为2种提交已消费消息offset的ack模式: 自动确认 手动确认 接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。 本章节

    2023年04月08日
    浏览(28)
  • Kafka3.0.0版本——消费者(自动提交 offset)

    官网文档 参数解释 参数 描述 enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 图解分析 消费者自动提交 offset代码 消费者自动提交

    2024年02月09日
    浏览(30)
  • 手动部署Kraft模式Kafka集群

    IP地址 Hostname Release Kafka-Version 172.29.145.157 iamdemo1 Centos7.9 kafka_2.12-3.5.1 172.29.145.182 iamdemo2 Centos7.9 kafka_2.12-3.5.1 172.29.145.183 iamdemo3 Centos7.9 kafka_2.12-3.5.1 下载安装包 kafka安装包官网下载 下载完成后上传到服务器/opt目录下解压 生成集群随机uuid 配置kafka集群的kraft模式参数 使用集群

    2024年02月05日
    浏览(34)
  • springboot集成kafka消费手动启动停止

    在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 1.通过定时任务自动触发,通过@Scheduled,在某个时间点暂停kafka某个监听的消费,也可以在某个时间点

    2024年02月06日
    浏览(33)
  • 手动配置 kafka 用户密码,认证方式等的方式

    部分场景会指定使用某一kafka 来提高安全性,这里就不得不使用用户密码认证方式等来控制

    2024年02月02日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包