关闭kafka自动消费
配置自定义容器工厂
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;
@Component
@Configuration
public class kafkaConfig {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Bean("pingKafkaFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
container.setConsumerFactory(consumerFactory);
//禁止自动启动
container.setAutoStartup(false);
return container;
}
}
在消费监听器上使用工厂,并设置id文章来源:https://www.toymoban.com/news/detail-772943.html
@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")
这样,启动项目后,就不会自动消费了。文章来源地址https://www.toymoban.com/news/detail-772943.html
手动开启和关闭消费
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;
/**
* Kafka消费监听服务实现类.
*/
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {
/**
* registry.
*/
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 开启监听.
*
* @param listenerId 监听ID
*/
@Override
public void startListener(String listenerId) {
//判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer(listenerId).isRunning()) {
registry.getListenerContainer(listenerId).start();
}
//项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
//registry.getListenerContainer(listenerId).stop();
log.info(listenerId + "开启监听成功。");
}
/**
* 停止监听.
*
* @param listenerId 监听ID
*/
@Override
public void stopListener(String listenerId) {
registry.getListenerContainer(listenerId).stop();
log.info(listenerId + "停止监听成功。");
}
}
到了这里,关于springboot 开启和关闭kafka消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!