kafka如何避免消息重复消费

这篇具有很好参考价值的文章主要介绍了kafka如何避免消息重复消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka 避免消息重复消费通常依赖于以下策略和机制:

1. Consumer Group ID

Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。

// 创建一个消费者并设置Group ID
Properties props = new Properties();
props.put("bootstrap.servers", "your-kafka-server:9092");
props.put("group.id", "unique-consumer-group-id");

// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

2. 提交消费位移(Offset Commit)

Kafka会记录每个消费者组消费的偏移量(Offset)。一旦消费者成功处理了消息,就会将偏移量提交给Kafka。当消费者重新启动时,它会从最后提交的偏移量处继续消费消息。

// 手动提交偏移量
consumer.commitSync();

3. 自动提交和手动提交

Kafka支持自动和手动提交偏移量。自动提交会定期提交偏移量,而手动提交需要在适当的时候手动调用提交方法。手动提交能够更好地控制偏移量的提交时机,避免重复消费。

// 开启自动提交位移
props.put("enable.auto.commit", "true");
// 设置自动提交的时间间隔
props.put("auto.commit.interval.ms", "1000");

4. 保证消息处理的幂等性

        应用程序层面可以保证消息的处理是幂等的,即使消息被重复处理也不会产生副作用。这可以通过唯一标识符或其他手段来识别和避免重复消息的影响。

        在分布式消息系统中,保证消息处理的幂等性是至关重要的。幂等性是指无论对同一条消息进行多少次处理,最终结果都是相同的。以下是一些保证消息处理幂等性的方法:

  • 唯一标识符

        为每条消息分配唯一的标识符(例如消息 ID),并在处理消息时检查该标识符是否已经处理过。可以利用数据库的唯一索引或分布式缓存(如Redis)来记录已经处理过的消息 ID。

// 假设 msgId 是消息的唯一标识符
if (!processedMessages.contains(msgId)) {
    // 处理消息的逻辑
    processedMessages.add(msgId);
}
  • 数据库事务

        在处理消息时,使用数据库事务来确保消息的处理操作是原子性的,并且如果相同消息被处理多次,只会产生一次结果变更。

  • 乐观锁机制

        在更新数据库或状态时,使用乐观锁机制确保只有第一个到达的处理请求会成功,后续重复的请求会被拒绝或忽略。

  • 版本控制

        对于每条消息,使用版本号来追踪状态的变化,确保相同的消息不会再次触发相同的状态变更。

  •  重试机制

        实现重试机制来处理消息处理失败的情况。当消息处理失败时,确保能够安全地重试,而不会产生重复的影响。

  • 幂等性接口

        设计接口时,考虑使其具有幂等性。例如,针对相同的请求多次调用接口不会对系统产生额外的影响,或者对相同请求的多次调用只会产生一次效果。

以上方法中,结合使用适合自身业务场景的机制,可以有效确保消息处理的幂等性。

5. 消息去重

        Kafka本身并不提供内置的消息去重机制,因此需要在消费者端实现消息去重的逻辑。下面是几种常见的去重方法:

  • 通过数据库或缓存存储消费记录

        在消费消息时,将消费记录存储在数据库或缓存中,并在消费前检查记录,如果已经消费过相同的消息,则不再进行处理。

// 假设 messageId 是消息的唯一标识符
if (!consumedMessages.contains(messageId)) {
    // 处理消息的逻辑
    consumedMessages.add(messageId);
}
  • 使用唯一标识符进行消息去重

        对于每条消息,可以利用消息的唯一标识符(例如消息 ID)进行去重,类似于上述的处理方式。

  • 使用消息的业务键进行去重

        如果消息包含业务键,可以根据业务键来进行去重。将业务键作为索引或键值存储在数据库或缓存中,在处理消息前检查是否存在相同的业务键。

  • 基于时间窗口的消息去重

        可以设置一个时间窗口,在此时间内的相同消息将被视为重复消息并被丢弃。

  • 使用 Kafka Streams 或 KSQL 进行去重

        Kafka Streams 或 KSQL 可以处理 Kafka 中的消息并进行去重、聚合等操作,可以针对数据流进行去重操作。

以上方法都是在消费者端进行消息去重的常见方式,需要根据业务场景和需求选择合适的方法。

常见问题

1、消费端程序跟不上/或者上游生产者数据量突增,导致下游kakfa数据堆积,消费不过来

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

kafka如何避免消息重复消费,kafka,分布式

解决方法:文章来源地址https://www.toymoban.com/news/detail-791491.html

  •  增加topic分区,同时代码端设置应用程序消费最大并发数
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory);
	factory.setConcurrency(4);
	factory.setBatchListener(true);
	factory.getContainerProperties().setPollTimeout(3000);
	//当使用手动提交时必须设置ackMode为MANUAL,否则会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
	factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
	return factory;
}
  • 优化消费者程序代码逻辑,提高处理效率
  • 降低每次处理消息条数max-poll-records,加大max.poll.interval.ms参数,这个值并不是越大越好,要根据实际情况调整到一个相对均衡的状态,才能起明显的作用
    spring:
     kafka: # kafka相关配置
        bootstrap-servers: 192.168.101.34:9092,192.168.101.35:9092,192.168.101.36:9092
        consumer:
          auto-offset-reset: latest
          group-id: refiner-tjw
          max-poll-records: 200   #单次拉取消息条数
        properties :
          max:
            poll:
              interval:
                ms: 18000 #单次消息最大处理时间

到了这里,关于kafka如何避免消息重复消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(31)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(35)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(30)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(25)
  • 如何避免重复消费消息

    博主介绍: ✌全网粉丝3W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌ 博主作品: 《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis

    2024年02月10日
    浏览(77)
  • 【消息队列】聊一下如何避免消息的重复消费

    一条消息在传输过程中,为了保证消息的不丢失,可能会多少量的消息进行重试,这样就可能导致Broker接受到的消息出现重复,如果说下游系统没有针对业务上的处理,那么可能导致同一笔借款或者支付订单出现重复扣款或者重复还款的情况。业务上是不允许出现的。 在MQ

    2024年02月10日
    浏览(28)
  • 分布式消息服务kafka

    什么是消息中间件? 消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。 例如:在淘宝购物时,订单系统处理完订单后,把订单消息发送到消息中间件中,由消息中间件将订单消息分发到下

    2024年02月01日
    浏览(32)
  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(38)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包