Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

这篇具有很好参考价值的文章主要介绍了Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置):

  • single - 每次消费单条记录
  • batch - 批量消费消息列表

且每种模式都分为2种提交已消费消息offset的ack模式:

  • 自动确认
  • 手动确认

接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。

1. 单记录消费listener.type=single

本章节先来讲讲record模式 - 单记录消费,且分为自动确认和手动确认2种方式来提交已消费消息offset。

1.1 单记录消费 - 自动确认

即由Spring Kafak框架按照配置规则自动提交已消费消息offset,无需程序手动编码控制。

需注意如下对应配置:

# ============ 方式1:定时自动提交[不推荐] =====================
# 开启自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit: true
# 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用)
spring.kafka.consumer.auto-commit-interval: 1s


# ========= 方式2:通过ack-mode设置自动提交[推荐] =============
# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit: false
# listener类型为单条记录single类型(默认为single单条消费模式)
spring.kafka.listener.type: single
# offset提交模式为record
spring.kafka.listener.ack-mode: record

注:
关于消费者提交已消费消息offset的相关配置说明:

  • spring.kafka.consumer.enbable-auto-commit
    • true 自动提交已消费消息offset
      • auto-commit-interval 设置自动提交间隔
    • fasle 由程序控制已消费消息offset提交
      • spring.kafka.listener.ack-mode 已消费offset提交模式

spring.kafka.listener.ack-mode列表(详细说明参见:SpringKafka - Committing Offsets)

ack-mode模式 说明 自动提交
RECORD
单记录
当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 ✔️
BATCH(默认)
批量
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 ✔️
TIME
超时
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
(通过spring.kafka.listener.ack-time设置触发时间)
✔️
COUNT
超过消费数量
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
(通过spring.kafka.listener.ack-count设置触发数量)
✔️
COUNT_TIME
超时或超数量
TIME或COUNT 有一个条件满足时提交 ✔️
MANUAL
手动提交(ack)后同BATCH
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
需要手动使用
Acknowledgment参数提交
MANUAL_IMMEDIATE
手动立即提交
手动调用Acknowledgment.acknowledge()后立即提交
需要手动使用
Acknowledgment参数提交

消费端配置示例1 - 定时自动提交[不推荐]

spring:
  kafka:
    # 逗号分隔的集群broker列表
    bootstrap-servers: localhost:9092
    # ====================================================
    # ================== 消费者配置 ========================
    # ====================================================
    consumer:
      # 自动提交(按周期)已消费offset
      enable-auto-commit: true
      # 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用)
      auto-commit-interval: 1s
      ...
    # ====================================================
    # ============= 消费者监听器(及线程池)配置 ==============
    # ====================================================
    listener:
      # listener类型
      # single | batch
      type: single
    # ====================================================
    # ============= 具体业务Kafka定义=======================
    # ====================================================
    biz1:
      topic: topic1
      consumer:
        group: group1

消费端配置示例2 - 通过ack-mode设置自动提交[推荐]
listener自动提交offsetd的ack-mode模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME
且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数。

spring:
  kafka:
    # 逗号分隔的集群broker列表
    bootstrap-servers: localhost:9092
    # ====================================================
    # ================== 消费者配置 ========================
    # ====================================================
    consumer:
      # 禁用自动提交(按周期)已消费offset
      enable-auto-commit: false
      ...
    # ====================================================
    # ============= 消费者监听器(及线程池)配置 ==============
    # ====================================================
    listener:
      # listener类型
      # single | batch
      type: single
      # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
      # 单记录  | 批量         | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
      # RECORD | BATCH(默认) | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
      # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
      # 注:listener自动提交offset模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME ,
      #    且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数
      ack-mode: record
    # ====================================================
    # ============= 具体业务Kafka定义=======================
    # ====================================================
    biz1:
      topic: topic1
      consumer:
        group: group1

消费端代码示例

/**
 * 定义biz1消息接收者
 *
 * @param message
 * @rabbit.exhange exchange1
 * @rabbit.aueue queue1
 * @rabbit.bindingKey #
 */
@KafkaListener(
        id = "biz1-${spring.kafka.biz1.consumer.group}",
        groupId = "${spring.kafka.biz1.consumer.group}",
        topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(String message) {
    log.info("[biz1Consumer] RECV MSG: {}", message);
}

1.2 单记录消费 - 手动确认

需注意如下对应配置:

# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit: false
# listener类型为单条记录single类型(默认为single单条消费模式)
spring.kafka.listener.type: single
# offset提交模式为manual_immediate
spring.kafka.listener.ack-mode: manual_immediate

消费端配置示例
手动提交offset的ack-mode模式包括:MANUAL | MANUAL_IMMEDIATE
且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数。

spring:
#  profiles:
#    active: kafka-origin
  kafka:
    # 逗号分隔的集群broker列表
    bootstrap-servers: localhost:9092
    # ====================================================
    # ================== 消费者配置 ========================
    # ====================================================
    consumer:
      # 禁用自动提交(按周期)已消费offset
      enable-auto-commit: false
      ...
    # ====================================================
    # ============= 消费者监听器(及线程池)配置 ==============
    # ====================================================
    listener:
      # listener类型
      # single | batch
      type: single
      # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
      # 单记录  | 批量   | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
      # RECORD | BATCH | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
      # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
      # 注:手动提交offset模式包括:MANUAL | MANUAL_IMMEDIATE
      #    且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数
      ack-mode: manual_immediate

消费端代码示例
参考:SpringKafka - Manual Acknowledgment

/**
 * 定义biz1消息接收者
 *
 * @param message
 * @kafka.topic topic1
 * @kafka.group group1
 */
@KafkaListener(
        id = "biz1-${spring.kafka.biz1.consumer.group}",
        groupId = "${spring.kafka.biz1.consumer.group}",
        topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(String message, Acknowledgment ack) {
    log.info("[biz1Consumer] RECV MSG: {}", message);
    //确认单当前消息(及之前的消息)offset均已被消费完成
    ack.acknowledge();
    //拒绝当前消息(此方法仅适用于listener.type=single)
    //当前poll查询出的剩余消息记录均被抛弃,
    //且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括之前poll被抛弃的消息)
    //ack.nack(3000)
}

2. 批量消费listener.type=batch

在一些需要通过批量处理消息的场景中,SpringKafka支持使用Batch Listeners,即批量处理消息列表。
本章节主要讲解batch模式 - 批量消费,且同样分为自动确认和手动确认2种方式来提交已消费消息offset。

2.1 批量消费 - 自动确认

需注意如下对应配置:

# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit: false
# 批量消费的单次最大消费记录数
spring.kafka.consumer.max-poll-reocrds: 50
# listener类型为批量batch类型(默认为single单条消费模式)
spring.kafka.listener.type: batch
# offset提交模式为batch(不可使用record - 启动报错)
spring.kafka.listener.ack-mode: batch

配置示例如下

spring:
  kafka:
    # 逗号分隔的集群broker列表
    bootstrap-servers: localhost:9092
    # ====================================================
    # ================== 消费者配置 ========================
    # ====================================================
    consumer:
      # 禁用自动提交(按周期)已消费offset
      enable-auto-commit: false
      # 单次poll()调用返回的记录数
      max-poll-records: 50
    # ====================================================
    # ============= 消费者监听器(及线程池)配置 ==============
    # ====================================================
    listener:
      # listener类型
      # single | batch
      type: batch
      # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
      # 单记录  | 批量   | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
      # RECORD | BATCH | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
      # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
      ack-mode: batch
    # ====================================================
    # ============= 具体业务Kafka定义=======================
    # ====================================================
    biz1:
      topic: topic1
      consumer:
        group: group1
    biz2:
      topic: topic2
      consumer:
        group: group2
        # 分区格式示例:0 | 0,1,2 | 0-3 
        partitions: 0

消费端代码示例

/**
 * 定义biz1消息接收者
 * 自动模式(无需手动ack):
 * 1. listener.type=batch
 * 2. ack-mode=batch
 *
 * @param messages
 * @kafka.topic topic1
 * @kafka.group group1
 */
@KafkaListener(
        id = "biz1-${spring.kafka.biz1.consumer.group}",
        groupId = "${spring.kafka.biz1.consumer.group}",
        topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(List<String> messages) {
    log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size());
    log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0));
}

/**
 * 定义biz2消息接收者
 * 自动模式(无需手动ack):
 * 1. listener.type=batch
 * 2. ack-mode=batch
 * 
 * @param messages
 * @kafka.topic topic2
 * @kafka.group group2
 */
@KafkaListener(
        id = "biz2-${spring.kafka.biz2.consumer.group}",
        groupId = "${spring.kafka.biz2.consumer.group}",
        //消费指定分区
        topicPartitions = {
                @TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}")
        })
public void biz2Consumer(List<Message> messages) {
    log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size());
    log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0));
}

Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

2.2 批量消费 - 手动确认

需注意如下对应配置:

# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit: false
# 批量消费的单次最大消费记录数
spring.kafka.consumer.max-poll-reocrds: 50
# listener类型为批量batch类型(默认为single单条消费模式)
spring.kafka.listener.type: batch
# offset提交模式为batch(不可使用record - 启动报错)
spring.kafka.listener.ack-mode: manual

配置示例如下

spring:
  kafka:
    # 逗号分隔的集群broker列表
    bootstrap-servers: localhost:9092
    # ====================================================
    # ================== 消费者配置 ========================
    # ====================================================
    consumer:
      # 禁用自动提交(按周期)已消费offset
      enable-auto-commit: false
      # 单次poll()调用返回的记录数
      max-poll-records: 50
    # ====================================================
    # ============= 消费者监听器(及线程池)配置 ==============
    # ====================================================
    listener:
      # listener类型
      # single | batch
      type: batch
      # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
      # 单记录  | 批量   | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
      # RECORD | BATCH | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
      # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
      ack-mode: manual
    # ====================================================
    # ============= 具体业务Kafka定义=======================
    # ====================================================
    biz1:
      topic: topic1
      consumer:
        group: group1
    biz2:
      topic: topic2
      consumer:
        group: group2
        partitions: 0

消费端代码示例

/**
 * 定义biz1消息接收者
 * 手动模式(需手动ack):
 * 1. listener.type=batch
 * 2. ack-mode=manual
 *
 * @param messages
 * @kafka.topic topic1
 * @kafka.group group1
 */
@KafkaListener(
        id = "biz1-${spring.kafka.biz1.consumer.group}",
        groupId = "${spring.kafka.biz1.consumer.group}",
        //仅在多partition单个消费者时,用于多线程消费消息(concurrency <= partition数量)
        //当存在多个消费者时,即便设置concurrency > 1也仅有唯一消费线程生效
        concurrency = "${spring.kafka.biz1.consumer.concurrency}",
        topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(List<String> messages, Acknowledgment ack) {
    log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size());
    log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0));
    //确认单当前消息(及之前的消息)offset均已被消费完成
    ack.acknowledge();

    //拒绝消息列表中指定index(发生错误的消息index)对应的消息(此方法仅适用于listener.type=batch),
    //当前指定index之前的消息会被成功提交,
    //当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃,
    //且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息)
    //如下即确认当前list中前5条消息(0-4),抛弃当前list中后续消息,3秒后再次poll查询未消费消息
    //ack.nack(5, 3000);
}


/**
 * 定义biz2消息接收者
 * 手动模式(需手动ack):
 * 1. listener.type=batch
 * 2. ack-mode=manual
 * 
 * @param messages
 * @kafka.topic topic2
 * @kafka.group group2
 */
@KafkaListener(
        id = "biz2-${spring.kafka.biz2.consumer.group}",
        groupId = "${spring.kafka.biz2.consumer.group}",
        //消费指定分区
        topicPartitions = {
                @TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}")
        })
public void biz2Consumer(List<Message> messages, Acknowledgment ack) {
    log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size());
    log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0));
    //确认单当前消息(及之前的消息)offset均已被消费完成
    ack.acknowledge();
}

3. 手动模式下的acknowledge和nack方法

在手动确认模式下,除了支持ack.acknowledge()方法用于确认单条记录(对应record模式)或者批次记录(对应batch模式),
还支持nack方法用于拒绝消息,关于acknowledgenack方法的详细使用见下表:文章来源地址https://www.toymoban.com/news/detail-403783.html

方法 说明 适用listener.type
acknowledge() 确认单条消息 或 整批消息 single
batch
nack(sleepMills) 拒绝确认当前消息,
当前poll查询出的剩余消息记录均被抛弃,
且当前消费线程在阻塞指定sleepMills时间后
会重新调用poll获取待消费消息(包括之前poll被抛弃的消息)
single
nack(index, sleepMills) 拒绝消息列表中指定index(发生错误的消息index)及其之后index对应的消息,
当前指定index之前的消息会被成功提交,
当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃,
且当前消费线程在阻塞指定sleepMills时间后
会重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息)
batch

到了这里,关于Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(72)
  • Kafka:消费者手动提交

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。 两种手动提交方式: commitSync(同步提交): 必须等待offset提交完毕,再去消费下一批数据。 同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素

    2024年02月11日
    浏览(40)
  • springboot集成kafka消费手动启动停止

    在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 1.通过定时任务自动触发,通过@Scheduled,在某个时间点暂停kafka某个监听的消费,也可以在某个时间点

    2024年02月06日
    浏览(46)
  • 7.5.tensorRT高级(2)-RAII接口模式下的生产者消费者多batch实现

    杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-RAII 接口模式下的生产者消费者多 batch 实现 课程大纲可看下面的思维导图 这节课我们利用上节课学到的 RAII

    2024年02月12日
    浏览(39)
  • Kafka3.0.0版本——消费者(手动提交offset)

    1.1、手动提交offset的两种方式 commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。 1.2、手动提交offset两种方式的区别 相同点:都会将本次提交的一批数据最高的偏移量提交。 不

    2024年02月09日
    浏览(47)
  • Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

    虽然offset十分遍历,但是由于其是基于时间提交的,开发人员难以把握offset提交的实际。因此Kafka还提供了手动提交offset的API 手动提交offset的方法有两种:分别commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交:不同点是

    2024年02月11日
    浏览(48)
  • 手动部署Kraft模式Kafka集群

    IP地址 Hostname Release Kafka-Version 172.29.145.157 iamdemo1 Centos7.9 kafka_2.12-3.5.1 172.29.145.182 iamdemo2 Centos7.9 kafka_2.12-3.5.1 172.29.145.183 iamdemo3 Centos7.9 kafka_2.12-3.5.1 下载安装包 kafka安装包官网下载 下载完成后上传到服务器/opt目录下解压 生成集群随机uuid 配置kafka集群的kraft模式参数 使用集群

    2024年02月05日
    浏览(42)
  • 汇总Kafka手动提交与自动提交

    程序拉取消息后,满足要求后自动提交,无需程序开发者介入。 2.1丢消息 在consumer拉取消息之后,达到提交时间AUTO_COMMIT_INTERBAL_MS_CONFIG之后位移(例如1-10)已经自动提交,若此时消息尚未消费完成时,且消费者挂掉了,此时尚未消费的消息会丢失。因为位移1-10已经提交,下

    2024年02月12日
    浏览(57)
  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(35)
  • Spring kafka源码分析——消息是如何消费的

    本文主要从Spring Kafka的源码来分析, 消费端 消费流程;从spring容器启动到消息被拉取下来,再到执行客户端自定义的消费逻辑,大致概括为以下4个部分: 源码分析主要也是从以上4个部分进行分析; 环境准备 maven依赖如下: 消费端代码: 参数配置使用默认配置 KafkaAutoConfi

    2024年02月13日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包