Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置):
- single - 每次消费单条记录
- batch - 批量消费消息列表
且每种模式都分为2种提交已消费消息offset的ack模式:
- 自动确认
- 手动确认
接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。
1. 单记录消费listener.type=single
本章节先来讲讲record模式 - 单记录消费,且分为自动确认和手动确认2种方式来提交已消费消息offset。
1.1 单记录消费 - 自动确认
即由Spring Kafak框架按照配置规则自动提交已消费消息offset,无需程序手动编码控制。
需注意如下对应配置:
# ============ 方式1:定时自动提交[不推荐] =====================
# 开启自动提交(按周期)已消费offsetspring.kafka.consumer.enable-auto-commit
: true
# 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用)spring.kafka.consumer.auto-commit-interval
: 1s
# ========= 方式2:通过ack-mode设置自动提交[推荐] =============
# 禁用自动提交(按周期)已消费offsetspring.kafka.consumer.enable-auto-commit
: false
# listener类型为单条记录single类型(默认为single单条消费模式)spring.kafka.listener.type
: single
# offset提交模式为recordspring.kafka.listener.ack-mode
: record
注:
关于消费者提交已消费消息offset的相关配置说明:
- spring.kafka.consumer.enbable-auto-commit
- true 自动提交已消费消息offset
- auto-commit-interval 设置自动提交间隔
- fasle 由程序控制已消费消息offset提交
- spring.kafka.listener.ack-mode 已消费offset提交模式
- true 自动提交已消费消息offset
spring.kafka.listener.ack-mode列表(详细说明参见:SpringKafka - Committing Offsets)
ack-mode模式 | 说明 | 自动提交 |
---|---|---|
RECORD 单记录 |
当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 | ✔️ |
BATCH(默认) 批量 |
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 | ✔️ |
TIME 超时 |
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 (通过spring.kafka.listener.ack-time设置触发时间) |
✔️ |
COUNT 超过消费数量 |
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 (通过spring.kafka.listener.ack-count设置触发数量) |
✔️ |
COUNT_TIME 超时或超数量 |
TIME或COUNT 有一个条件满足时提交 | ✔️ |
MANUAL 手动提交(ack)后同BATCH |
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 | ❌ 需要手动使用 Acknowledgment参数提交 |
MANUAL_IMMEDIATE 手动立即提交 |
手动调用Acknowledgment.acknowledge()后立即提交 | ❌ 需要手动使用 Acknowledgment参数提交 |
消费端配置示例1 - 定时自动提交[不推荐]
spring:
kafka:
# 逗号分隔的集群broker列表
bootstrap-servers: localhost:9092
# ====================================================
# ================== 消费者配置 ========================
# ====================================================
consumer:
# 自动提交(按周期)已消费offset
enable-auto-commit: true
# 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用)
auto-commit-interval: 1s
...
# ====================================================
# ============= 消费者监听器(及线程池)配置 ==============
# ====================================================
listener:
# listener类型
# single | batch
type: single
# ====================================================
# ============= 具体业务Kafka定义=======================
# ====================================================
biz1:
topic: topic1
consumer:
group: group1
消费端配置示例2 - 通过ack-mode设置自动提交[推荐]
listener自动提交offsetd的ack-mode
模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME
,
且使用相关自动模式不可
在@KafkaListener标注方法中使用Acknowledgment
参数。
spring:
kafka:
# 逗号分隔的集群broker列表
bootstrap-servers: localhost:9092
# ====================================================
# ================== 消费者配置 ========================
# ====================================================
consumer:
# 禁用自动提交(按周期)已消费offset
enable-auto-commit: false
...
# ====================================================
# ============= 消费者监听器(及线程池)配置 ==============
# ====================================================
listener:
# listener类型
# single | batch
type: single
# 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
# 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
# RECORD | BATCH(默认) | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE
# https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
# 注:listener自动提交offset模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME ,
# 且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数
ack-mode: record
# ====================================================
# ============= 具体业务Kafka定义=======================
# ====================================================
biz1:
topic: topic1
consumer:
group: group1
消费端代码示例
/**
* 定义biz1消息接收者
*
* @param message
* @rabbit.exhange exchange1
* @rabbit.aueue queue1
* @rabbit.bindingKey #
*/
@KafkaListener(
id = "biz1-${spring.kafka.biz1.consumer.group}",
groupId = "${spring.kafka.biz1.consumer.group}",
topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(String message) {
log.info("[biz1Consumer] RECV MSG: {}", message);
}
1.2 单记录消费 - 手动确认
需注意如下对应配置:
# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit
: false
# listener类型为单条记录single类型(默认为single单条消费模式)spring.kafka.listener.type
: single
# offset提交模式为manual_immediatespring.kafka.listener.ack-mode
: manual_immediate
消费端配置示例
手动提交offset的ack-mode
模式包括:MANUAL | MANUAL_IMMEDIATE
,
且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment
参数。
spring:
# profiles:
# active: kafka-origin
kafka:
# 逗号分隔的集群broker列表
bootstrap-servers: localhost:9092
# ====================================================
# ================== 消费者配置 ========================
# ====================================================
consumer:
# 禁用自动提交(按周期)已消费offset
enable-auto-commit: false
...
# ====================================================
# ============= 消费者监听器(及线程池)配置 ==============
# ====================================================
listener:
# listener类型
# single | batch
type: single
# 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
# 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
# RECORD | BATCH | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE
# https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
# 注:手动提交offset模式包括:MANUAL | MANUAL_IMMEDIATE
# 且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数
ack-mode: manual_immediate
消费端代码示例
参考:SpringKafka - Manual Acknowledgment
/**
* 定义biz1消息接收者
*
* @param message
* @kafka.topic topic1
* @kafka.group group1
*/
@KafkaListener(
id = "biz1-${spring.kafka.biz1.consumer.group}",
groupId = "${spring.kafka.biz1.consumer.group}",
topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(String message, Acknowledgment ack) {
log.info("[biz1Consumer] RECV MSG: {}", message);
//确认单当前消息(及之前的消息)offset均已被消费完成
ack.acknowledge();
//拒绝当前消息(此方法仅适用于listener.type=single)
//当前poll查询出的剩余消息记录均被抛弃,
//且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括之前poll被抛弃的消息)
//ack.nack(3000)
}
2. 批量消费listener.type=batch
在一些需要通过批量处理消息的场景中,SpringKafka支持使用Batch Listeners,即批量处理消息列表。
本章节主要讲解batch模式 - 批量消费,且同样分为自动确认和手动确认2种方式来提交已消费消息offset。
2.1 批量消费 - 自动确认
需注意如下对应配置:
# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit
: false
# 批量消费的单次最大消费记录数spring.kafka.consumer.max-poll-reocrds
: 50
# listener类型为批量batch类型(默认为single单条消费模式)spring.kafka.listener.type
: batch
# offset提交模式为batch(不可使用record - 启动报错)spring.kafka.listener.ack-mode
: batch
配置示例如下
spring:
kafka:
# 逗号分隔的集群broker列表
bootstrap-servers: localhost:9092
# ====================================================
# ================== 消费者配置 ========================
# ====================================================
consumer:
# 禁用自动提交(按周期)已消费offset
enable-auto-commit: false
# 单次poll()调用返回的记录数
max-poll-records: 50
# ====================================================
# ============= 消费者监听器(及线程池)配置 ==============
# ====================================================
listener:
# listener类型
# single | batch
type: batch
# 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
# 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
# RECORD | BATCH | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE
# https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
ack-mode: batch
# ====================================================
# ============= 具体业务Kafka定义=======================
# ====================================================
biz1:
topic: topic1
consumer:
group: group1
biz2:
topic: topic2
consumer:
group: group2
# 分区格式示例:0 | 0,1,2 | 0-3
partitions: 0
消费端代码示例
/**
* 定义biz1消息接收者
* 自动模式(无需手动ack):
* 1. listener.type=batch
* 2. ack-mode=batch
*
* @param messages
* @kafka.topic topic1
* @kafka.group group1
*/
@KafkaListener(
id = "biz1-${spring.kafka.biz1.consumer.group}",
groupId = "${spring.kafka.biz1.consumer.group}",
topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(List<String> messages) {
log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size());
log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0));
}
/**
* 定义biz2消息接收者
* 自动模式(无需手动ack):
* 1. listener.type=batch
* 2. ack-mode=batch
*
* @param messages
* @kafka.topic topic2
* @kafka.group group2
*/
@KafkaListener(
id = "biz2-${spring.kafka.biz2.consumer.group}",
groupId = "${spring.kafka.biz2.consumer.group}",
//消费指定分区
topicPartitions = {
@TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}")
})
public void biz2Consumer(List<Message> messages) {
log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size());
log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0));
}
2.2 批量消费 - 手动确认
需注意如下对应配置:
# 禁用自动提交(按周期)已消费offset
spring.kafka.consumer.enable-auto-commit
: false
# 批量消费的单次最大消费记录数spring.kafka.consumer.max-poll-reocrds
: 50
# listener类型为批量batch类型(默认为single单条消费模式)spring.kafka.listener.type
: batch
# offset提交模式为batch(不可使用record - 启动报错)spring.kafka.listener.ack-mode
: manual
配置示例如下
spring:
kafka:
# 逗号分隔的集群broker列表
bootstrap-servers: localhost:9092
# ====================================================
# ================== 消费者配置 ========================
# ====================================================
consumer:
# 禁用自动提交(按周期)已消费offset
enable-auto-commit: false
# 单次poll()调用返回的记录数
max-poll-records: 50
# ====================================================
# ============= 消费者监听器(及线程池)配置 ==============
# ====================================================
listener:
# listener类型
# single | batch
type: batch
# 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
# 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
# RECORD | BATCH | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE
# https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
ack-mode: manual
# ====================================================
# ============= 具体业务Kafka定义=======================
# ====================================================
biz1:
topic: topic1
consumer:
group: group1
biz2:
topic: topic2
consumer:
group: group2
partitions: 0
消费端代码示例文章来源:https://www.toymoban.com/news/detail-403783.html
/**
* 定义biz1消息接收者
* 手动模式(需手动ack):
* 1. listener.type=batch
* 2. ack-mode=manual
*
* @param messages
* @kafka.topic topic1
* @kafka.group group1
*/
@KafkaListener(
id = "biz1-${spring.kafka.biz1.consumer.group}",
groupId = "${spring.kafka.biz1.consumer.group}",
//仅在多partition单个消费者时,用于多线程消费消息(concurrency <= partition数量)
//当存在多个消费者时,即便设置concurrency > 1也仅有唯一消费线程生效
concurrency = "${spring.kafka.biz1.consumer.concurrency}",
topics = "${spring.kafka.biz1.topic}")
public void biz1Consumer(List<String> messages, Acknowledgment ack) {
log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size());
log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0));
//确认单当前消息(及之前的消息)offset均已被消费完成
ack.acknowledge();
//拒绝消息列表中指定index(发生错误的消息index)对应的消息(此方法仅适用于listener.type=batch),
//当前指定index之前的消息会被成功提交,
//当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃,
//且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息)
//如下即确认当前list中前5条消息(0-4),抛弃当前list中后续消息,3秒后再次poll查询未消费消息
//ack.nack(5, 3000);
}
/**
* 定义biz2消息接收者
* 手动模式(需手动ack):
* 1. listener.type=batch
* 2. ack-mode=manual
*
* @param messages
* @kafka.topic topic2
* @kafka.group group2
*/
@KafkaListener(
id = "biz2-${spring.kafka.biz2.consumer.group}",
groupId = "${spring.kafka.biz2.consumer.group}",
//消费指定分区
topicPartitions = {
@TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}")
})
public void biz2Consumer(List<Message> messages, Acknowledgment ack) {
log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size());
log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0));
//确认单当前消息(及之前的消息)offset均已被消费完成
ack.acknowledge();
}
3. 手动模式下的acknowledge和nack方法
在手动确认模式下,除了支持ack.acknowledge()
方法用于确认单条记录
(对应record
模式)或者批次记录
(对应batch
模式),
还支持nack
方法用于拒绝消息,关于acknowledge
和nack
方法的详细使用见下表:文章来源地址https://www.toymoban.com/news/detail-403783.html
方法 | 说明 | 适用listener.type |
---|---|---|
acknowledge() | 确认单条消息 或 整批消息 | single batch |
nack(sleepMills) | 拒绝确认当前消息, 当前poll查询出的剩余消息记录均被抛弃, 且当前消费线程在阻塞指定sleepMills时间后 会重新调用poll获取待消费消息(包括之前poll被抛弃的消息) |
single |
nack(index, sleepMills) | 拒绝消息列表中指定index(发生错误的消息index)及其之后index对应的消息, 当前指定index之前的消息会被成功提交, 当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃, 且当前消费线程在阻塞指定sleepMills时间后 会重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息) |
batch |
到了这里,关于Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!