Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

这篇具有很好参考价值的文章主要介绍了Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生产者ack机制

Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。

Kafka 提供了三种 ACK 机制的配置选项,分别是:

  1. acks=0:生产者在成功将消息发送到网络缓冲区后即视为消息已被提交,不等待任何服务器响应。这种配置下,可能会出现消息丢失的情况。

  2. acks=1:生产者在成功将消息发送到主题的分区 leader 后即视为消息已被提交。这种配置下,生产者会收到分区 leader
    的确认,但仍有可能出现消息丢失的情况,例如当 leader 出现故障,而消息尚未复制到其他副本时。

  3. acks=all 或acks=-1:生产者需要等待所有分区副本都成功写入消息后才视为消息已被提交。这种配置下,生产者会等待所有分区副本的确认,确保消息被复制到足够数量的副本后才返回提交确认。这是最安全的确认方式,但也会导致较长的等待时间。

在实际使用中,根据对消息可靠性和延迟的要求,可以选择不同的 ACKs 级别。一般来说,如果对消息的可靠性要求较高,可以选择较高的 ACKs 级别,但需要考虑相应的延迟成本。

我们可以通过spring.kafka.producer.acks来配置ack机制

spring.kafka.producer.acks=1

消费者ack模式

kafka支持的消费模式,在AbstractMessageListenerContainer.AckMode的枚举中,下面就介绍下各个模式的区别

public enum AckMode {

		/**
		 * Commit after each record is processed by the listener.
		 */
		RECORD,

		/**
		 * Commit whatever has already been processed before the next poll.
		 */
		BATCH,

		/**
		 * Commit pending updates after
		 * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
		 */
		TIME,

		/**
		 * Commit pending updates after
		 * {@link ContainerProperties#setAckCount(int) ackCount} has been
		 * exceeded.
		 */
		COUNT,

		/**
		 * Commit pending updates after
		 * {@link ContainerProperties#setAckCount(int) ackCount} has been
		 * exceeded or after {@link ContainerProperties#setAckTime(long)
		 * ackTime} has elapsed.
		 */
		COUNT_TIME,

		/**
		 * User takes responsibility for acks using an
		 * {@link AcknowledgingMessageListener}.
		 */
		MANUAL,

		/**
		 * User takes responsibility for acks using an
		 * {@link AcknowledgingMessageListener}. The consumer
		 * immediately processes the commit.
		 */
		MANUAL_IMMEDIATE,

	}

AckMode模式

RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
当使用 RECORD 确认模式时,消息监听容器会在每个消息被单独处理后进行确认。这意味着,如果一条消息被成功处理,它将作为单独的记录进行确认;如果处理失败,也会针对该消息进行错误记录。这种确认模式适用于需要精确处理每个消息的应用场景,例如确保每个消息都被正确处理。

BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
当使用 BATCH 确认模式时,消息监听容器会在批量处理一组消息后进行确认。这意味着,消息监听容器会将多个消息合并为批次,并将它们作为一组进行处理。只有在整个批次都被成功处理后,该批次的所有消息才会被确认。这种确认模式适用于需要提高处理效率的场景,例如批量处理大量消息以减少网络传输和系统调用的开销。

TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

COUNT_TIME:TIME或COUNT 有一个条件满足时提交

MANUAL:这是手动确认模式,消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息。只有当消费者调用 acknowledge() 方法后,才会向 Kafka 服务器发送确认消息。这种模式可以保证消息的可靠性和顺序性,但需要消费者显式地处理确认逻辑。

MANUAL_IMMEDIATE:这是立即手动确认模式,与 MANUAL 模式类似,但消费者在调用 acknowledge() 方法时,会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度,但可能会增加重复消费的风险。

MANUAL和MANUAL_IMMEDIATE的区别

MANUAL 和 MANUAL_IMMEDIATE 都是 Kafka 消费者的手动确认模式,它们的区别在于确认的时机不同。

MANUAL 模式下,消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息,在调用该方法之后,消息才会被标记为已消费,并且确认消息会在下次 poll() 时发送到 Kafka 服务器。这种模式的优点是可以保证消息的可靠性和顺序性,但需要消费者显式地处理确认逻辑。

相比之下,MANUAL_IMMEDIATE 模式下,在消费者调用 Acknowledgment.acknowledge() 方法时,会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度,但可能会增加重复消费的风险,因为如果消息处理失败,Kafka 不会再次发送该消息,而是认为该消息已经被成功消费了。

在实际使用中,应根据业务需求和性能要求来选择合适的确认模式。如果要求消息的可靠性和顺序性比较高,可以选择 MANUAL 模式;如果要求处理速度比较高,可以选择 MANUAL_IMMEDIATE 模式。

AckMode 可以通过配置文件或代码进行设置。例如,在 Spring Boot 应用中,可以使用以下配置方式指定确认模式:

spring.kafka.listener.ack-mode=manual_immediate

手动提交ACK

kafka默认是自动提交ack的,很多时候,我们都需要手动提交,这就要进行以下配置

1、设置enable-auto-commit=false,禁止自动提交
2、设置ack-mode为manual_immediate

在配置文件进行如下配置

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate

3、监听方法的入参加入Acknowledgment ack 参数,并在消费完成之后调用acknowledge方法,如下所示文章来源地址https://www.toymoban.com/news/detail-761547.html

	@KafkaListener(topics = "my-topic2",groupId = "myGroup")
    public void  receiveMessage2(String message, Acknowledgment ack){
        log.info("消费消息:"+message);
        //ack确认
        ack.acknowledge();
    }

到了这里,关于Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring整合RabbitMQ——生产者

    添加依赖坐标,在producer和consumer模块的pom文件中各复制一份。 配置producer的配置文件 配置producer的xml配置文件 编写测试类发送消息

    2024年02月07日
    浏览(32)
  • Kafka 入门到起飞 - 生产者参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR?

    上回书我们讲了,生产者发送消息流程解析传送门 那么这篇我们来看下,生产者发送消息时几个重要的参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR? bootstrap.servers : Kafka 集群地址 host1:port1,host2:port2,host3:port3 不需要写Kafka集群中全部的broker地址,但是也不要写

    2024年02月15日
    浏览(27)
  • Spring Kafka生产者实现

    我们需要通过Spring Kafka库,将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。 我这里不需要写版本号,应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适

    2024年02月08日
    浏览(32)
  • Spring整合RabbitMQ——生产者(利用配置类)

    配置RabbitMQ的基本信息,用来创建连接工厂的 编写启动类 编写配置类

    2024年02月07日
    浏览(30)
  • Spring整合RabbitMQ-配制文件方式-1-消息生产者

    Spring-amqp是对AMQP的一些概念的一些抽象,Spring-rabbit是对RabbitMQ操作的封装实现。 主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等 RabbitAdmin 类完成对Exchange、Queue、Binding的操作,在容器中管理 了 RabbitAdmin 类的时候,可以对Exchange、Queue、Binding进行自动声

    2024年02月09日
    浏览(32)
  • Spring-Kafka生产者源码分析

    本文主要概括Spring Kafka生产者发送消息的主流程 代码准备: SpringBoot项目中maven填加以下依赖 消息发送使用 KafkaTemplate 启动类 KafkaAutoConfiguration 有两个地方需要关注 其中的 ProducerFactory 使用的是 DefaultKafkaProducerFactory 在发送消息之前,Spring Kafka会先创建 Producer ,返回的是 Clos

    2024年02月09日
    浏览(29)
  • Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    浏览(35)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(49)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(37)
  • Kafka-生产者

    Kafka在实际应用中,经常被用作高性能、可扩展的消息中间件。 Kafka自定义了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息。 在实践生产过程中,一套API封装良好、灵活易用的客户端可以避免开发人员重复劳动,提高开发效率,也

    2024年01月20日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包