1.业务需求
在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。
2.实现
2.1.创建消费监听
按照官方文档创建一个监听。
官方文档地址
KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
2.2.控制启动/停止消费
通过KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可达到控制目的。
创建一个Kafak控制类,实现控制代码。
KafkaCtrlHandler.java
@Slf4j
@Component
public class KafkaCtrlHandler {
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 开始消费
*/
public void start() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
listenerContainer.resume();
log.info("kafka consumer开始消费");
}
/**
* 停止消费
*/
public void stop() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
listenerContainer.pause();
log.info("kafka consumer停止消费");
}
}
这样即可通过KafkaCtrlHandler 实例来控制消费者开始或者暂停监听。
2.3.控制启动消费时只消费最新数据
让KafkaConsumer类实现org.springframework.kafka.listener包下的ConsumerSeekAware接口,并实现onPartitionsAssigned方法。
监听创建时,设置各个分区的偏移量。
具体原理待研究,有懂的大佬请留言指教。
新的KafkaConsumer.java文章来源地址https://www.toymoban.com/news/detail-414049.html
@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
@NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));
}
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
注意,修改上面的kafka控制类KafkaCtrlHandler.java,停止消费时让监听器停止(stop)而非暂停(pause)。这样监听才会重新创建并设置各分区的偏移量。
新KafkaCtrlHandler.java
@Slf4j
@Component
public class KafkaCtrlHandler {
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 开始消费
*/
public void start() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
listenerContainer.resume();
log.info("kafka consumer开始消费");
}
/**
* 停止消费
*/
public void stop() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
// !!!!这里变了!!!!
listenerContainer.stop();
log.info("kafka consumer停止消费");
}
}
2.4.设置springboot 启动时消费者监听不自动启动
创建配置类
KafkaInitialConfiguration.java
@Slf4j
@Configuration
public class KafkaInitialConfiguration {
// 监听器工厂
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> customContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//设置是否自动启动
factory.setAutoStartup(false);
return factory;
}
}
配置监听工厂文章来源:https://www.toymoban.com/news/detail-414049.html
新的KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
@NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));
}
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id",
// !!!这里变了!!!!!
containerFactory = "customContainerFactory"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
到了这里,关于springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!