springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据

这篇具有很好参考价值的文章主要介绍了springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.业务需求

在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。

2.实现

2.1.创建消费监听

按照官方文档创建一个监听。
官方文档地址

KafkaConsumer.java

@Slf4j
@Component
public class KafkaConsumer {
	@KafkaListener(
            id = "consumer-id",
            topics = {"topic1", "topic1", "topic3"},
            groupId = "group-id"
    )
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
        }
    }
}

2.2.控制启动/停止消费

通过KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可达到控制目的。
创建一个Kafak控制类,实现控制代码。

KafkaCtrlHandler.java

@Slf4j
@Component
public class KafkaCtrlHandler {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开始消费
     */
    public void start() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        if (!listenerContainer.isRunning()) {
            listenerContainer.start();
        }
        listenerContainer.resume();
        log.info("kafka consumer开始消费");
    }

    /**
     * 停止消费
     */
    public void stop() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        listenerContainer.pause();
        log.info("kafka consumer停止消费");
    }
}

这样即可通过KafkaCtrlHandler 实例来控制消费者开始或者暂停监听。

2.3.控制启动消费时只消费最新数据

让KafkaConsumer类实现org.springframework.kafka.listener包下的ConsumerSeekAware接口,并实现onPartitionsAssigned方法。
监听创建时,设置各个分区的偏移量。
具体原理待研究,有懂的大佬请留言指教。

新的KafkaConsumer.java文章来源地址https://www.toymoban.com/news/detail-414049.html

@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
                                     @NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
        assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
        			 topicPartition.partition()));
    }
    
    @KafkaListener(
            id = "consumer-id",
            topics = {"topic1", "topic1", "topic3"},
            groupId = "group-id"
    )
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
        }
    }
}

注意,修改上面的kafka控制类KafkaCtrlHandler.java,停止消费时让监听器停止(stop)而非暂停(pause)。这样监听才会重新创建并设置各分区的偏移量。

新KafkaCtrlHandler.java

@Slf4j
@Component
public class KafkaCtrlHandler {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开始消费
     */
    public void start() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        if (!listenerContainer.isRunning()) {
            listenerContainer.start();
        }
        listenerContainer.resume();
        log.info("kafka consumer开始消费");
    }

    /**
     * 停止消费
     */
    public void stop() {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
        assert listenerContainer != null;
        // !!!!这里变了!!!!
        listenerContainer.stop();
        log.info("kafka consumer停止消费");
    }
}

2.4.设置springboot 启动时消费者监听不自动启动

创建配置类

KafkaInitialConfiguration.java

@Slf4j
@Configuration
public class KafkaInitialConfiguration {
    // 监听器工厂
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> customContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //设置是否自动启动
        factory.setAutoStartup(false);
        return factory;
    }
}

配置监听工厂

新的KafkaConsumer.java

@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
                                     @NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
        assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
        			 topicPartition.partition()));
    }
    
    @KafkaListener(
            id = "consumer-id",
            topics = {"topic1", "topic1", "topic3"},
            groupId = "group-id",
            // !!!这里变了!!!!!
            containerFactory = "customContainerFactory"
    )
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
        }
    }
}

到了这里,关于springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    浏览(48)
  • springboot集成kafka消费手动启动停止

    在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 1.通过定时任务自动触发,通过@Scheduled,在某个时间点暂停kafka某个监听的消费,也可以在某个时间点

    2024年02月06日
    浏览(45)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(44)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(48)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(86)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(47)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(44)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(45)
  • Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包