Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

这篇具有很好参考价值的文章主要介绍了Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

概述

在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。

在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。


思路

首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。

以下是一个示例配置:

spring.kafka.consumer.bootstrap-servers=<Kafka服务器地址>
spring.kafka.consumer.group-id=<消费者组ID>

接下来,可以创建一个Kafka消费者,使用@KafkaListener注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "<Kafka主题>")
    public void receive(String message) {
        // 处理接收到的消息
    }
}

现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:

方法1:使用@KafkaListener注解的autoStartup属性

@KafkaListener注解具有一个名为autoStartup的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为true,表示自动启动。如果将其设置为false,则消费者将不会自动启动。

@KafkaListener(topics = "<Kafka主题>", autoStartup = "false")
public void receive(String message) {
    // 处理接收到的消息
}

要在运行时动态启动消费者,你可以通过KafkaListenerEndpointRegistry bean来手动启动:

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;

// 启动消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").start();

同样,你也可以使用stop()方法来停止消费者:

// 停止消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").stop();

方法2:使用KafkaListenerEndpointRegistry bean的pause()resume()方法

KafkaListenerEndpointRegistry bean提供了pause()resume()方法,用于暂停和恢复消费者的监听。

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;

// 暂停消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").pause();

// 恢复消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").resume();

使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。


Code


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author artisan
 */
@Slf4j
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Value("${spring.kafka.consumer.group-id}")
    private String group_id;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;

    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;

    private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";


    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(32);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );
        return props;
    }


    /**
     * 消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        return factory;
    }




    /**
     * 异常处理器
     *
     * @return
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
//            log.error("消息{} , 异常原因{}", message, exception.getMessage());
            log.error("consumerAwareListenerErrorHandler called");

            return null;
        };
    }

}


使用

   @KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",
            containerFactory = "batchFactory",
            errorHandler = "consumerAwareListenerErrorHandler",
            id = "attackConsumer")
    public void processMessage(List<String> records, Acknowledgment ack)  {
        log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());
        try {
            List<AttackMessage> attackMessages = new ArrayList();
            records.stream().forEach(record -> {
                messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);
            });
            if (!attackMessages.isEmpty()) {
                attackMessageESService.addDocuments(attackMessages, false);
            }
        } finally {
            ack.acknowledge();
        }
    }

在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,

  • topicPattern参数指定了该消费者要监听的主题的模式,即以 KafkaTopicConstant.ATTACK_MESSAGE开头的所有主题。
  • containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。
  • errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。

在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。

在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。

最后,手动确认已经消费了这些消息。


【控制】


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

 
@Slf4j
@RestController
public class KafkaConsumerController {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听
     */
    @GetMapping("/start")
    public void start() {
        // 判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("attackConsumer").isRunning()) {
            log.info("start  ");

            registry.getListenerContainer("attackConsumer").start();
        }
        // 将其恢复
        registry.getListenerContainer("attackConsumer").resume();

        log.info("resume over ");
    }

    /**
     * 关闭监听
     */
    @GetMapping("/pause")
    public void pause() {
        // 暂停监听
        registry.getListenerContainer("attackConsumer").pause();

        log.info("pause");
    }
}
    

扩展

KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。

在 Spring Boot 应用程序中使用 @KafkaListener 注解时,Spring Kafka 会自动创建一个 KafkaListenerEndpointRegistry 实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。

Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现文章来源地址https://www.toymoban.com/news/detail-505986.html

到了这里,关于Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring boot学习第八篇:kafka监听消费

    为了实现监听器功能 pom.xml文件内容如下:  application.yml文件内容如下: logback.xml文件内容如下: BackendApplication.java文件内容如下: 然后添加了kafkaConsumerListenerExample.java文件 发到服务器上,启动hmblogs报错,截图如下: Caused by: java.lang.TypeNotPresentException: Type org.springframework.k

    2024年01月19日
    浏览(60)
  • Apache Kafka - 重识消费者

    Kafka是一个分布式的消息队列系统,它的出现解决了传统消息队列系统的吞吐量瓶颈问题。 Kafka的高吞吐量、低延迟和可扩展性使得它成为了很多公司的首选消息队列系统。 在Kafka中,消息被分成了不同的主题(Topic),每个主题又被分成了不同的分区(Partition)。 生产者(

    2024年02月06日
    浏览(32)
  • Apache zookeeper kafka 开启SASL安全认证_kafka开启认证(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新网络安全全套学习资料》

    2024年04月23日
    浏览(35)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(46)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • SpringBoot开启动态定时任务并手动、自动关闭

    场景需求:在执行某个方法的两小时之后进行某个操作 涉及:定时任务、哈希表 需要注意: 业务逻辑层是单一实例的,所以在定时任务类内操作业务逻辑层的某个属性和在业务逻辑层内操作的都是同一个。 使用Map存放数据不要用IdentityHashMap,因为IdentityHashMap比较key值用的是

    2024年01月24日
    浏览(49)
  • Apache zookeeper kafka 开启SASL安全认证

    背景:我之前安装的kafka没有开启安全鉴权,在没有任何凭证的情况下都可以访问kafka。搜了一圈资料,发现有关于sasl、acl相关的,准备试试。 Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可

    2024年03月14日
    浏览(45)
  • Apache zookeeper kafka 开启SASL安全认证 —— 筑梦之路

      Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可以水平扩展和具有高吞吐量特性而被广泛使用,并与多类开源分布式处理系统进行集成使用。   Kafka作为一款开源的、轻量级的、分布式、可

    2024年02月11日
    浏览(32)
  • transaction 事务 开启 关闭 不使用 手动控制 spring springboot mybatis

    spring springboot mybatis 事务配置 Transactional的Propagation 开启事务 关闭事务_globalcoding 单元测试时,发现默认是使用事务。想要关闭事务,使用: 做单元测试的时候,发现默认是使用事务的。代码和日志如下: 日志: 通过日志发现,默认用了事务transaction,这会有一个现象,就是

    2024年02月03日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包