@KafkaListener注解来自spring-kafka包。使用@KafkaListener消费消息,需要按照spring-kafka指定的格式填写kafka配置信息,即可自动装配生成相关的KafkaConsumer实例,然后使用@KafkaListener消费消息。这里需要注意,使用自动装载方式生成KafkaConsumer实例时,spring-kafka的配置参数与原生kafka的配置参数在格式上略有不同,因此,本文主要介绍了spring-kafka自动装载方式下生产者、消费者常用的配置参数,供参考使用:
1、依赖项
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<!-- kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<!-- 配置信息补全提示 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
2、配置文件
spring:
kafka:
producer:
bootstrap-servers: 172.*.*.1:8423,172.*.*.2:8423,172.*.*.3:8423,172.*.*.4:8423,172.*.*.5:8423
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
### 这里无效,因为这是Kafka服务器的配置
# auto.create.topics.enable: false
# 生产者信息
properties:
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='***' password='md5(***)';
consumer:
bootstrap-servers: 172.*.*.1:8423,172.*.*.2:8423,172.*.*.3:8423,172.*.*.4:8423,172.*.*.5:8423
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: ***
# 拉取数据数量上限(不满足时等待poll-timeout毫秒)
max-poll-records: 200
# 拉取数据字节下限(不满足时等待fetch-max-wait毫秒)
fetch-min-size: 1
# 拉取数据等待上限(不满足fetch-min-size的等待时间)
fetch-max-wait: 5000
# 手动提交偏移量
enable-auto-commit: false
# 偏移量复位方式 earliest latest none
auto-offset-reset: earliest
# 消费者信息
properties:
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='***' password='md5(***)';
listener:
# 拉取数据方式: 批量
type: batch
# 请求数据小于max-poll-records,poll方法会持续请求,直到超时
poll-timeout: 500
# 指定listener容器中的线程数,用于提高并发量(可在代码中配置)
# concurrency: 6
ack-mode: manual_immediate
properties:
# 拉取数据间隔(须大于消息处理耗时)
max:
poll:
interval:
ms: 600000
# group coordinator判定消费实例僵死并踢除的时间阈值
session:
timeout:
ms: 120000 #默认10000
3、代码块
@Slf4j
@Component
public class XxxKafkaListener {
@Autowired
XxxKafkaConsumer xxxKafkaConsumer;
// @KafkaListener(topics = "#{'${topics.xxx}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
@KafkaListener(topics = "#{'${topics.xxx}'.split(',')}",concurrency = "#{'${concur.xxx}'}" )
public void listenXxx(ConsumerRecords<?, ?> records, Acknowledgment ack){
try {
/// 消息处理
/// Iterator<ConsumerRecord<?,?>> iterator = (Iterator)records.iterator();
/// while(iterator.hasNext()){
/// JSONObject json = JSON.parseObject((String)iterator.next().value());
/// ......
/// }
/// 消息处理
xxxKafkaConsumer.processRecords(records);
}catch (Exception e) {
/// 上述语句抛出异常后,直接运行至切面,不会执行下述语句
log.error("处理xxx信息异常:{}", e);
}
ack.acknowledge();
}
}
4、关于KafkaListener接口
在Spring Boot中,@KafkaListener 注解主要是依赖于 KafkaMessageListenerContainer 类。该类是Spring Kafka提供的一种消息监听器容器,它可以根据配置信息监听并消费Kafka消息。当我们在方法上添加@KafkaListener注解时,Spring Boot会自动创建 KafkaMessageListenerContainer 实例,并将消息路由到相应的处理方法。
public @interface KafkaListener {
/// 监听器id(可用来命名消费者线程)
String id() default "";
/// 监听器工厂
String containerFactory() default "";
/// 监听器主题
String[] topics() default {};
/// 监听器主题,匹配正则表达式
String topicPattern() default "";
/// 监听器主题&分区
TopicPartition[] topicPartitions() default {};
/// 异常处理器
String errorHandler() default "";
/// 消费组id
String groupId() default "";
/// 是否使用id作为groupId
boolean idIsGroup() default true;
}
4.1 containerFactory 监听器工厂
/// myKafkaListenerContainerFactory 代表了一个kafka集群
@KafkaListener(
containerFactory = "myKafkaListenerContainerFactory",
topics = "#{'${spring.kafka.topics}'.split(',')}",
groupId = "${spring.kafka.consumer.group}"
)
@Bean(name = "myKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> myKafkaListenerContainerFactory() {
return initKafkaListenerContainerFactory(ConfigManager.get("spring.kafka.consumer.brokers", "127.0.0.1:9092"));
}
4.2 监听器的topic
topic的配置方式有3种,分别是topics、topicPattern、topicPartitions;
(1)topics,可以指定多个topic
@KafkaListener( topics = {"topic1","topic2"}, /// 或 topics = "#{'${spring.kafka.topics}'.split(',')}",
groupId = "${spring.kafka.consumer.group_id}" )
(2)topicPattern,支持正则表达式
@KafkaListener(topicPattern = "topic_*", concurrency = "6")
public void onMessage( @Payload String data,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
Acknowledgment ack, //手动提交offset
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offSet,
Consumer<?, ?> consumer //消费者
)
(3)topicPartitions,可以为监听器配置主题和分区(及可选的初始偏移量)
// 监听topic1的0,1分区;监听topic2的0分区,1分区从offset为100的开始消费;
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void onMessage(ConsumerRecord<?, ?> record) {
...
}
4.3 errorHandler 异常处理器
errorHandler指定了错误处理器的beanName:
@KafkaListener(
topics = "#{'${spring.kafka.topics}'.split(',')}",
groupId = "${spring.kafka.consumer.group_id}",
errorHandler = "errorHandler"
)
可以在consumer中手动try/catch,也可实现 KafkaListenerErrorHandler 复用异常处理逻辑;
@Component("errorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
/// handle error ......
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
/// handle error ......
return null;
}
}
4.4 groupId 监听器的消费组文章来源:https://www.toymoban.com/news/detail-851915.html
如果配置了属性groupId,则groupId优先级最高文章来源地址https://www.toymoban.com/news/detail-851915.html
@KafkaListener(id = "consumer-id1", idIsGroup = false, topics = "topic1", groupId = "consumer_group")
到了这里,关于@KafkaListener的配置使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!