Kafka 避免消息重复消费通常依赖于以下策略和机制:
1. Consumer Group ID
Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。
// 创建一个消费者并设置Group ID
Properties props = new Properties();
props.put("bootstrap.servers", "your-kafka-server:9092");
props.put("group.id", "unique-consumer-group-id");
// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
2. 提交消费位移(Offset Commit)
Kafka会记录每个消费者组消费的偏移量(Offset)。一旦消费者成功处理了消息,就会将偏移量提交给Kafka。当消费者重新启动时,它会从最后提交的偏移量处继续消费消息。
// 手动提交偏移量
consumer.commitSync();
3. 自动提交和手动提交
Kafka支持自动和手动提交偏移量。自动提交会定期提交偏移量,而手动提交需要在适当的时候手动调用提交方法。手动提交能够更好地控制偏移量的提交时机,避免重复消费。
// 开启自动提交位移
props.put("enable.auto.commit", "true");
// 设置自动提交的时间间隔
props.put("auto.commit.interval.ms", "1000");
4. 保证消息处理的幂等性
应用程序层面可以保证消息的处理是幂等的,即使消息被重复处理也不会产生副作用。这可以通过唯一标识符或其他手段来识别和避免重复消息的影响。
在分布式消息系统中,保证消息处理的幂等性是至关重要的。幂等性是指无论对同一条消息进行多少次处理,最终结果都是相同的。以下是一些保证消息处理幂等性的方法:
-
唯一标识符
为每条消息分配唯一的标识符(例如消息 ID),并在处理消息时检查该标识符是否已经处理过。可以利用数据库的唯一索引或分布式缓存(如Redis)来记录已经处理过的消息 ID。
// 假设 msgId 是消息的唯一标识符
if (!processedMessages.contains(msgId)) {
// 处理消息的逻辑
processedMessages.add(msgId);
}
-
数据库事务
在处理消息时,使用数据库事务来确保消息的处理操作是原子性的,并且如果相同消息被处理多次,只会产生一次结果变更。
-
乐观锁机制
在更新数据库或状态时,使用乐观锁机制确保只有第一个到达的处理请求会成功,后续重复的请求会被拒绝或忽略。
-
版本控制
对于每条消息,使用版本号来追踪状态的变化,确保相同的消息不会再次触发相同的状态变更。
-
重试机制
实现重试机制来处理消息处理失败的情况。当消息处理失败时,确保能够安全地重试,而不会产生重复的影响。
-
幂等性接口
设计接口时,考虑使其具有幂等性。例如,针对相同的请求多次调用接口不会对系统产生额外的影响,或者对相同请求的多次调用只会产生一次效果。
以上方法中,结合使用适合自身业务场景的机制,可以有效确保消息处理的幂等性。
5. 消息去重
Kafka本身并不提供内置的消息去重机制,因此需要在消费者端实现消息去重的逻辑。下面是几种常见的去重方法:
-
通过数据库或缓存存储消费记录
在消费消息时,将消费记录存储在数据库或缓存中,并在消费前检查记录,如果已经消费过相同的消息,则不再进行处理。
// 假设 messageId 是消息的唯一标识符
if (!consumedMessages.contains(messageId)) {
// 处理消息的逻辑
consumedMessages.add(messageId);
}
-
使用唯一标识符进行消息去重
对于每条消息,可以利用消息的唯一标识符(例如消息 ID)进行去重,类似于上述的处理方式。
-
使用消息的业务键进行去重
如果消息包含业务键,可以根据业务键来进行去重。将业务键作为索引或键值存储在数据库或缓存中,在处理消息前检查是否存在相同的业务键。
-
基于时间窗口的消息去重
可以设置一个时间窗口,在此时间内的相同消息将被视为重复消息并被丢弃。
-
使用 Kafka Streams 或 KSQL 进行去重
Kafka Streams 或 KSQL 可以处理 Kafka 中的消息并进行去重、聚合等操作,可以针对数据流进行去重操作。
以上方法都是在消费者端进行消息去重的常见方式,需要根据业务场景和需求选择合适的方法。
常见问题
1、消费端程序跟不上/或者上游生产者数据量突增,导致下游kakfa数据堆积,消费不过来
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
文章来源:https://www.toymoban.com/news/detail-791491.html
解决方法:文章来源地址https://www.toymoban.com/news/detail-791491.html
- 增加topic分区,同时代码端设置应用程序消费最大并发数
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(4);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
//当使用手动提交时必须设置ackMode为MANUAL,否则会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
- 优化消费者程序代码逻辑,提高处理效率
- 降低每次处理消息条数max-poll-records,加大max.poll.interval.ms参数,这个值并不是越大越好,要根据实际情况调整到一个相对均衡的状态,才能起明显的作用
spring: kafka: # kafka相关配置 bootstrap-servers: 192.168.101.34:9092,192.168.101.35:9092,192.168.101.36:9092 consumer: auto-offset-reset: latest group-id: refiner-tjw max-poll-records: 200 #单次拉取消息条数 properties : max: poll: interval: ms: 18000 #单次消息最大处理时间
到了这里,关于kafka如何避免消息重复消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!