@KafkaListener指定kafka集群

这篇具有很好参考价值的文章主要介绍了@KafkaListener指定kafka集群。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {

	@Autowired
	private KafkaTemplate kafkaTemplate;

	public void sendMessage(String topic, Object object) {

		/*
		 * 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于
		 * kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于
		 * ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型
		 */
		ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

		future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
			@Override
			public void onFailure(Throwable throwable) {
				log.error("发送消息失败:" + throwable.getMessage());
			}
			@Override
			public void onSuccess(SendResult<String, Object> sendResult){
			    // log.info("发送消息成功:" + sendResult.toString());
			}
		});
	}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {

    @Resource
    Environment environment;

    @Bean
    public KafkaListenerContainerFactory<?> containerFactory() {

        Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);
        Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);

        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));
        containerFactory.setConcurrency(concurrency); // 消费并发数量
        containerFactory.setBatchListener(true);      // 批量监听消息
        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移
        containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限
        return containerFactory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {

        String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");
        String groupId          = environment.getProperty("kafka.groupId", "consumer-group");
        String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");
        String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");
        String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");
        String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        /// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", jaasConfig);

        return props;
    }
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群文章来源地址https://www.toymoban.com/news/detail-811315.html

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {

    @Autowired
    private Environment environment;

    @Autowired
    private KafkaMsgHandleService msgHandleService;

    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;

    /************************
     *      接收消息
     ************************/
    @Override
    @KafkaListener( containerFactory = "containerFactory", 
    				groupId = "${kafka.groupId}", 
    				topics = "#{'${kafka.topics}'.split(',')}", 
    				concurrency = "${kafka.concurrency}")
    public void onMessage(List<ConsumerRecord<String, String>> records) {
        try {
            final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());
            log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));
            /// 处理消息
            msgs.forEach(this::processRecord);
        } catch (Exception e) {
            log.error("KafkaListener_kafka_consume_error.", e);
        }
    }

    /************************
     *      处理消息
     ************************/
    private void processRecord(String msg) {
        taskExecutor.submit(() -> {
            if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {
                log.warn("KafkaListener_turn_off_drop_message.");
                return;
            }
            msgHandleService.handle(msg);
        });
    }
}

到了这里,关于@KafkaListener指定kafka集群的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • zookeeper+kafka分布式消息队列集群的部署

    目录 一、zookeeper 1.Zookeeper 定义 2.Zookeeper 工作机制 3.Zookeeper 特点 4.Zookeeper 数据结构 5.Zookeeper 应用场景 (1)统一命名服务 (2)统一配置管理 (3)统一集群管理 (4)服务器动态上下线 6.Zookeeper 选举机制 (1)第一次启动选举机制 (2)非第一次启动选举机制 7.部署zookeepe

    2024年02月14日
    浏览(47)
  • kafka @KafkaListener 动态接收topic

    @KafkaListener 里边的 topics 必须是常量,不可以是变量 但是某些业务场景 kafka定义的topic会不同这时候就需要传入变量才可以实现 具体实现方式如下: KafkaListener 监听方法 #{}  这里边是方法名称  这里是获取topic 其实可以在对应的@Bean里边写逻辑方法去处理 这里用到了获取配

    2024年02月13日
    浏览(38)
  • 【简单认识zookeeper+kafka分布式消息队列集群的部署】

    Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。 Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已

    2024年02月13日
    浏览(40)
  • springboot kafka消息消费学习 @KafkaListener 使用

    kafka 配置类 用途:定义使用的基本 kafka 配置,以及定义Bean 下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作 @Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有 实际 kafka 监听消费 @ConditionalOnProperty spring boot 用于判断当前类是否加

    2024年02月08日
    浏览(39)
  • 深入理解Spring Kafka中@KafkaListener注解的参数与使用方式

    Apache Kafka作为一个强大的消息代理系统,与Spring框架的集成使得在分布式应用中处理消息变得更加简单和灵活。Spring Kafka提供了 @KafkaListener 注解,为开发者提供了一种声明式的方式来定义消息监听器。在本文中,我们将深入探讨 @KafkaListener 注解的各种参数以及它们的使用方

    2024年01月16日
    浏览(49)
  • 【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

    Apache Kafka是分布式发布-订阅消息系统。 它最初由LinkedIn公司开发,之后成为Apache项目的一部分。 Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。 Apache Kafka与传统消息系统相比,有以下不同: 它将消息持久化到磁盘,因此可用于批量消

    2023年04月09日
    浏览(42)
  • 多个消费者订阅一个Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    记录 :465 场景 :一个Producer在一个Topic发布消息,多个消费者Consumer订阅Kafka的Topic。每个Consumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    浏览(46)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆级超详细含图文)

    说明: 本篇将详细介绍用二进制安装包部署hadoop等组件,注意事项,各组件的使用,常用的一些命令,以及在部署中遇到的问题解决思路等等,都将详细介绍。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系统版本 1.2.2内存建议最少4g、2cpu、50G以上的磁盘容量 本次

    2024年02月12日
    浏览(50)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(46)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包