Kafka中如何动态开启、关闭消费者
使用背景:在开发业务中需要根据具体逻辑选择开启还是关闭消费者
实现逻辑:
1、创建consumer配置类,自定义工厂、自定义消费者配置(省略)
还需要注入consumerService bean(改类里主要是控制动态启停的具体实现方法)
2、接口实现文章来源:https://www.toymoban.com/news/detail-794216.html
public class ConsumerKafkaService {
private final Kafka KafkaListenerEndpointRegistry registry;
//暂停消费topic
public void pauseTopic(String topic){
MessageListenerContainer container =getContainer();
if(Objects.nonNull(container) && container.isRunning() ){
//取对应topic
Collection<TopicPartition> partitions = getTopicPartitions(container);
partitions.stream.filter(p -> p.topic().contains(topic)).forEach(partition -> {
if(!container.isPartitionPaused(partition)){
container.pausePartition(partition);
log.info("partition:{} 停止消费",partition);
}
});
}
}
//获取对应topic和分区集合
private static Collection<TopicPartition> getTopicPartitions(MessageListenerContainer container){
return Optional.ofNullable(container.getAssignedPartitions()).orElseGet(Collections::emptyList);
}
//开启消费
public void resumeTopic(String topic){
MessageListenerContainer container =getContainer();
if(Objects.nonNull(container) && container.isRunning() ){
//取对应topic
Collection<TopicPartition> partitions = getTopicPartitions(container);
partitions.stream.filter(p -> p.topic().contains(topic)).forEach(partition -> {
if(!container.isPartitionPaused(partition)){
container.resumePartition(partition);
log.info("partition:{} 开启消费",partition);
}
});
}
}
//根据指定id获取容器
private MessageListenerContainer getContainer(){
return registry.getListenerContainer("XXX");
}
//首次执行,初始化
public void initPause(){
MessageListenerContainer container =getContainer();
if(Objects.nonNull(container) && container.isRunning() ){
//取对应topic
Collection<TopicPartition> partitions = getTopicPartitions(container);
partitions.forEach(partition -> {
if(PlatformCache.availableTopics.contains(partition.topic())){
if(container.isPartitionPaused(partition)){
container.resumePartition(partition);
log.info("partition:{} 开启消费",partition);
}
}else{
container.pausePartition(partition);
log.info("partition:{} 暂停消费",partition);
}
});
}
}
}
消费监听方法上,@KafkaListener(topicPattern=“${topicPattern}”,id=“XXX”,idIsGroup = false)即可监听开启消费的topic数据文章来源地址https://www.toymoban.com/news/detail-794216.html
到了这里,关于动态启停kafka消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!