springboot 开启和关闭kafka消费

这篇具有很好参考价值的文章主要介绍了springboot 开启和关闭kafka消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

关闭kafka自动消费

配置自定义容器工厂

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class kafkaConfig {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Bean("pingKafkaFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }
}

在消费监听器上使用工厂,并设置id

@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")

这样,启动项目后,就不会自动消费了。文章来源地址https://www.toymoban.com/news/detail-772943.html

手动开启和关闭消费


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;

/**
 * Kafka消费监听服务实现类.
 */
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {


    /**
     * registry.
     */
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void startListener(String listenerId) {
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).start();
        }
        //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
        //registry.getListenerContainer(listenerId).stop();
        log.info(listenerId + "开启监听成功。");
    }

    /**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void stopListener(String listenerId) {
        registry.getListenerContainer(listenerId).stop();
        log.info(listenerId + "停止监听成功。");
    }

}

到了这里,关于springboot 开启和关闭kafka消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot开启动态定时任务并手动、自动关闭

    场景需求:在执行某个方法的两小时之后进行某个操作 涉及:定时任务、哈希表 需要注意: 业务逻辑层是单一实例的,所以在定时任务类内操作业务逻辑层的某个属性和在业务逻辑层内操作的都是同一个。 使用Map存放数据不要用IdentityHashMap,因为IdentityHashMap比较key值用的是

    2024年01月24日
    浏览(33)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月08日
    浏览(31)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(33)
  • spring boot学习第八篇:kafka监听消费

    为了实现监听器功能 pom.xml文件内容如下:  application.yml文件内容如下: logback.xml文件内容如下: BackendApplication.java文件内容如下: 然后添加了kafkaConsumerListenerExample.java文件 发到服务器上,启动hmblogs报错,截图如下: Caused by: java.lang.TypeNotPresentException: Type org.springframework.k

    2024年01月19日
    浏览(43)
  • Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

    Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置): single - 每次消费单条记录 batch - 批量消费消息列表 且每种模式都分为2种提交已消费消息offset的ack模式: 自动确认 手动确认 接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。 本章节

    2023年04月08日
    浏览(28)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(37)
  • Kafka:消费者手动提交

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。 两种手动提交方式: commitSync(同步提交): 必须等待offset提交完毕,再去消费下一批数据。 同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素

    2024年02月11日
    浏览(28)
  • Kafka重复消费以及消费线程安全关闭的解决方案

    Kafka消费程序每次重启都会出现重复消费的情况,考虑是在kill掉程序的时候,有部分消费完的数据没有提交offsect。 此处表明自动提交,即延迟提交(poll的时候会根据配置的自动提交时间间隔去进行检测并提交)。当kill掉程序的时候,可能消费完的数据还没有到达提交的时间

    2024年02月13日
    浏览(36)
  • Kafka3.0.0版本——消费者(手动提交offset)

    1.1、手动提交offset的两种方式 commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。 1.2、手动提交offset两种方式的区别 相同点:都会将本次提交的一批数据最高的偏移量提交。 不

    2024年02月09日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包