@KafkaListener的配置使用

这篇具有很好参考价值的文章主要介绍了@KafkaListener的配置使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

@KafkaListener注解来自spring-kafka包。使用@KafkaListener消费消息,需要按照spring-kafka指定的格式填写kafka配置信息,即可自动装配生成相关的KafkaConsumer实例,然后使用@KafkaListener消费消息。这里需要注意,使用自动装载方式生成KafkaConsumer实例时,spring-kafka的配置参数与原生kafka的配置参数在格式上略有不同,因此,本文主要介绍了spring-kafka自动装载方式下生产者、消费者常用的配置参数,供参考使用:

1、依赖项

<!-- spring-kafka --> 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

<!-- 配置信息补全提示 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

2、配置文件

spring:
  kafka:
    producer:
      bootstrap-servers: 172.*.*.1:8423,172.*.*.2:8423,172.*.*.3:8423,172.*.*.4:8423,172.*.*.5:8423
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      ### 这里无效,因为这是Kafka服务器的配置
      # auto.create.topics.enable: false
      # 生产者信息
      properties:
        sasl.mechanism: SCRAM-SHA-512
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='***' password='md5(***)';
    consumer:
      bootstrap-servers: 172.*.*.1:8423,172.*.*.2:8423,172.*.*.3:8423,172.*.*.4:8423,172.*.*.5:8423
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: ***
      # 拉取数据数量上限(不满足时等待poll-timeout毫秒)
      max-poll-records: 200
      # 拉取数据字节下限(不满足时等待fetch-max-wait毫秒)
      fetch-min-size: 1
      # 拉取数据等待上限(不满足fetch-min-size的等待时间)
      fetch-max-wait: 5000
      # 手动提交偏移量
      enable-auto-commit: false
      # 偏移量复位方式 earliest latest none
      auto-offset-reset: earliest
      # 消费者信息
      properties:
        sasl.mechanism: SCRAM-SHA-512
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='***' password='md5(***)';
    listener:
      # 拉取数据方式: 批量
      type: batch
      # 请求数据小于max-poll-records,poll方法会持续请求,直到超时
      poll-timeout: 500
      # 指定listener容器中的线程数,用于提高并发量(可在代码中配置)
      # concurrency: 6
      ack-mode: manual_immediate
    properties:
      # 拉取数据间隔(须大于消息处理耗时)
      max:
        poll:
          interval:
            ms: 600000
      # group coordinator判定消费实例僵死并踢除的时间阈值
      session:
        timeout:
          ms: 120000  #默认10000

3、代码块

@Slf4j
@Component
public class XxxKafkaListener {

    @Autowired
    XxxKafkaConsumer xxxKafkaConsumer;

    // @KafkaListener(topics = "#{'${topics.xxx}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
    @KafkaListener(topics = "#{'${topics.xxx}'.split(',')}",concurrency = "#{'${concur.xxx}'}" )
    public void listenXxx(ConsumerRecords<?, ?> records, Acknowledgment ack){

        try {
		    /// 消息处理
		    /// Iterator<ConsumerRecord<?,?>> iterator = (Iterator)records.iterator();
			/// while(iterator.hasNext()){
			/// 	JSONObject json = JSON.parseObject((String)iterator.next().value());
			/// 	......
			/// }
			
			/// 消息处理
            xxxKafkaConsumer.processRecords(records);
        }catch (Exception e) {
            /// 上述语句抛出异常后,直接运行至切面,不会执行下述语句
            log.error("处理xxx信息异常:{}", e);
        }
        ack.acknowledge();
    }
}

4、关于KafkaListener接口

在Spring Boot中,@KafkaListener 注解主要是依赖于 KafkaMessageListenerContainer 类。该类是Spring Kafka提供的一种消息监听器容器,它可以根据配置信息监听并消费Kafka消息。当我们在方法上添加@KafkaListener注解时,Spring Boot会自动创建 KafkaMessageListenerContainer 实例,并将消息路由到相应的处理方法。

public @interface KafkaListener {
 
	/// 监听器id(可用来命名消费者线程)
	String id() default "";
 
	/// 监听器工厂
	String containerFactory() default "";
 
	/// 监听器主题
	String[] topics() default {};
 
	/// 监听器主题,匹配正则表达式
	String topicPattern() default "";
 
	/// 监听器主题&分区
	TopicPartition[] topicPartitions() default {};
 
	/// 异常处理器
	String errorHandler() default "";
 
	/// 消费组id
	String groupId() default "";
 
	/// 是否使用id作为groupId
	boolean idIsGroup() default true;
}

4.1 containerFactory 监听器工厂

/// myKafkaListenerContainerFactory 代表了一个kafka集群
@KafkaListener(
        containerFactory = "myKafkaListenerContainerFactory",
        topics = "#{'${spring.kafka.topics}'.split(',')}",
        groupId = "${spring.kafka.consumer.group}"
)

@Bean(name = "myKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> myKafkaListenerContainerFactory() {
    return initKafkaListenerContainerFactory(ConfigManager.get("spring.kafka.consumer.brokers", "127.0.0.1:9092"));
}

4.2 监听器的topic

topic的配置方式有3种,分别是topics、topicPattern、topicPartitions;

(1)topics,可以指定多个topic

@KafkaListener( topics = {"topic1","topic2"}, /// 或 topics = "#{'${spring.kafka.topics}'.split(',')}",
				groupId = "${spring.kafka.consumer.group_id}" )

(2)topicPattern,支持正则表达式

@KafkaListener(topicPattern = "topic_*", concurrency = "6")
public void onMessage( @Payload String data,
					   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
					   @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
					   Acknowledgment ack, //手动提交offset
					   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
					   @Header(KafkaHeaders.OFFSET) long offSet,
					   Consumer<?, ?> consumer //消费者 
					  )

(3)topicPartitions,可以为监听器配置主题和分区(及可选的初始偏移量)

// 监听topic1的0,1分区;监听topic2的0分区,1分区从offset为100的开始消费;
@KafkaListener(id = "thing2", topicPartitions =
		{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
		  @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
		})
public void onMessage(ConsumerRecord<?, ?> record) {
	...
}

4.3 errorHandler 异常处理器

errorHandler指定了错误处理器的beanName:

@KafkaListener(
        topics = "#{'${spring.kafka.topics}'.split(',')}",
        groupId = "${spring.kafka.consumer.group_id}",
        errorHandler = "errorHandler"
)

可以在consumer中手动try/catch,也可实现 KafkaListenerErrorHandler 复用异常处理逻辑;

@Component("errorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
	
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
		/// handle error ......
        return null;
    }
 
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        /// handle error ......
        return null;
    }
}

4.4 groupId 监听器的消费组

如果配置了属性groupId,则groupId优先级最高文章来源地址https://www.toymoban.com/news/detail-851915.html

 @KafkaListener(id = "consumer-id1", idIsGroup = false, topics = "topic1", groupId = "consumer_group")

到了这里,关于@KafkaListener的配置使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

    Apache Kafka是分布式发布-订阅消息系统。 它最初由LinkedIn公司开发,之后成为Apache项目的一部分。 Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。 Apache Kafka与传统消息系统相比,有以下不同: 它将消息持久化到磁盘,因此可用于批量消

    2023年04月09日
    浏览(31)
  • @KafkaListener的配置使用

    @KafkaListener注解来自spring-kafka包。使用@KafkaListener消费消息,需要按照spring-kafka指定的格式填写kafka配置信息,即可自动装配生成相关的KafkaConsumer实例,然后使用@KafkaListener消费消息。这里需要注意,使用自动装载方式生成KafkaConsumer实例时,spring-kafka的配置参数与原生kafka的配

    2024年04月15日
    浏览(16)
  • Spring Boot中KafkaListener的介绍、原理和使用方法

    Kafka是一个高性能的分布式消息队列,它被广泛应用于对实时数据进行处理和分析。在Spring Boot中,我们可以通过 @KafkaListener 注解来监听并处理Kafka消息。本文将介绍Spring Boot中 @KafkaListener 注解的介绍、原理和使用方法。 @KafkaListener 注解是Spring Kafka提供的一种消费消息的方式

    2024年02月10日
    浏览(30)
  • @KafkaListener 注解配置多个 topic

    主要见 @KafkaListener 中 topics 属性的配置 其中 ${xxxx.topic1} 为从springBoot 配置文件中读取的属性值 由于该注解 topic 为字符串数组的类型,所以可以如上加大括号来完成指定

    2024年02月21日
    浏览(24)
  • @KafkaListener注解详解(一)| 常用参数详解

    @KafkaListener 注解提供了许多可配置的参数,以便更灵活地定制 Kafka 消息监听器的行为。 描述: 指定监听的 Kafka 主题,可以是一个字符串数组。这是最基本的参数,它定义了监听器将从哪个或哪些主题接收消息。 例子: @KafkaListener(topics = \\\"my-topic\\\") 描述: 指定 Kafka 消费者组

    2024年02月04日
    浏览(28)
  • Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

    Kafka消费者提交Offset的策略有 自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了 手动提交Offset 消费者在消费消息时/后,再提交o

    2024年02月08日
    浏览(34)
  • @KafkaListener 详解及消息消费启停控制

    参考:Kafka参数 (1) id: 默认是每个Listener实例的重要标识。 对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。 (2) goupId: 每个消费者所属的组。 每个消费者都有自己所属的组。一个组中可

    2024年01月24日
    浏览(27)
  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(32)
  • 使用spring-kafka的Java API操作Kafka集群的Topic

    记录 :462 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析:

    2024年02月10日
    浏览(31)
  • Spring boot使用Kafka Java反序列化漏洞 CVE-2023-34040

    背景:公司项目扫描到 Spring-Kafka上使用通配符模式匹配进行的安全绕过漏洞 CVE-2023-20873 中等风险 | 2023年8月23日 | CVE-2023-34040 在Spring for Apache Kafka 3.0.9及更早版本以及2.9.10及更早版本中,存在可能的反序列化攻击向量,但只有在应用了不常见的配置时才会出现。攻击者必须在

    2024年02月07日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包