自定义kafka客户端消费topic

这篇具有很好参考价值的文章主要介绍了自定义kafka客户端消费topic。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

自定义kafka客户端消费topic

结论

使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。

1 背景

后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic

2 spring集成2.1.8.RELEASE版本不支持autoStartup属性

使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没有找到可以直接配置属性autoStartup = "false"来手动启动topic,可能是版本低的原因,如果有可以支持的版本,也可以打在评论区,我去验证一下。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.8.RELEASE</version>
</dependency>
@KafkaListener(topics = "<Kafka主题>", autoStartup = "false") 
public void receive(String message) {    
	// 处理接收到的消息 
}

3 自定义kafka客户端消费topic

3.1 yml配置

spring:
  kafka:
      bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092
      consumer:
        group-id: data-dev
        enable-auto-commit: true
        auto-offset-reset: latest
        auto-commit-interval: 1000
      topic:
        costomTopic: costomData

3.2 KafkaConfig客户端配置

kafka其他配置项和原有的kafka客户端配置一样,只有额外增加了一个cutomConsumer让spring来管理,方便手动启动客户端来使用

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    //    @Value("${spring.kafka.listener.concurrency}")
//    private Integer concurrency;
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // concurrency
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    @Bean
    public KafkaConsumer cutomConsumer() {
        // 新建一个自定义启动消费者
        KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs());
        return consumer;
    }
}

3.3 手动启动消费客户端

这里手动启动消费客户端只有在配置了costomTopic才开始启动,如果需要动态指定启停topic

@Component
public class CutomKafkaConsumer {

    // 使用cutomConsumer实例消费
    @Autowired
    private KafkaConsumer cutomConsumer;

    @Value("${spring.kafka.topic.costomTopic:}")
    public void setCostomTopic(String costomTopic) {
        // 手动启动消费类,防止下级模块默认不配置costomTopic导致启动报错
        if (StringUtils.isEmpty(costomTopic)) {
            return;
        }
        // 使这个消费者订阅对应话题
        cutomConsumer.subscribe(Collections.singleton(costomTopic));
        // 单线程拉取消息
        ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
        consumerExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    ConsumerRecords<String, String> records = cutomConsumer.poll(3000);
                    if (!records.iterator().hasNext()) {
                        continue;
                    }
                    try {
                        // 捕获异常,防止顶级消费循环被异常中断
                        records.forEach(record -> operate(record));
                    } catch (Exception e) {
                        log.error("消费数据失败,失败原因: {}", e.getMessage(), e);
                    }
                    // 通过异步的方式提交位移
                    cutomConsumer.commitAsync(((offsets, exception) -> {
                        if (exception == null) {
                            offsets.forEach((topicPartition, metadata) -> {
                                System.out.println(topicPartition + " -> offset=" + metadata.offset());
                            });
                        } else {
                            exception.printStackTrace();
                            // 如果出错了,同步提交位移
                            cutomConsumer.commitSync(offsets);
                        }
                    }));
                }
            }
        });
    }
}    

public void operate(ConsumerRecord<String, String> record) {
    log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());
}

参考:
Kafka消费者——API开发
Kafka Consumer如何实现精确一次消费数据
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
@KafkaListener 详解及消息消费启停控制
kafka多个消费者消费一个topic_kafka消费者组与重平衡机制,了解一下
kafka学习(五):消费者分区策略(再平衡机制)
Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析文章来源地址https://www.toymoban.com/news/detail-784786.html

到了这里,关于自定义kafka客户端消费topic的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka-客户端使用

    Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API 封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来

    2024年02月22日
    浏览(53)
  • SpringCloud系列篇:核心组件之声明式HTTP客户端组件【远程消费】

    接下来看看由辉辉所写的关于SpringCloud的相关操作吧 目录 🥳🥳Welcome Huihui\\\'s Code World ! !🥳🥳 一. 远程消费组件是什么 二. 远程消费组件的详解 场景模拟 代码实操 1.生产者 2.消费者 3.复杂参数的处理 DTO 属性赋值          声明式HTTP客户端组件是一种用于简化HTTP请求的

    2024年02月02日
    浏览(37)
  • kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(47)
  • kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(54)
  • kafka客户端工具(Kafka Tool)的安装

    官方下载 根据不同的系统下载对应的版本,点击下载后双击,如何一直下一步,安装 kafka环境搭建请参考:CentOS 搭建Kafka集群 (1)连接kafka (2)简单使用  

    2024年04月23日
    浏览(76)
  • kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(61)
  • python-kafka客户端封装

    本文对python的kafka包做简单封装,方便kafka初学者使用。包安装: kafka_helper.py kafka_test.py Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

    2024年02月09日
    浏览(41)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(62)
  • c#客户端Kafka的使用方法

    Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目之一。Kafka能够处理大规模的实时数据流,支持高可靠性、高可扩展性、低延迟和高吞吐量。它主要用于构建实时数据管道和流式处理应用程序。 Kafka的核心概念包括:Producer(生产者)

    2024年02月12日
    浏览(63)
  • 【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    kafka客户端是公司内部基于spring-kafka封装的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群认证方式:SASL_PLAINTEXT/SCRAM-SHA-512 经过多年的经验,以及实际验证,配置是没问题的,但是业务方反馈用相同的配置,还是报错! 封装的kafka客户端版本过低,高版本的配置项:secu

    2024年01月17日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包