一、问题描述
- 在使用使用kafka-consumer-group.sh查看消息消费情况,消息都已经消费完了,但是CONSUMER-ID,HOST,CLIENT-ID字段的信息不显示,而且,消费实例也在运行中,却出现了Consumer group 'manage.group1' has no active members.,如下图所示:
- 消费者的代码如下:
public class OffsetConsumer { public static void main(String[] args) { //设置配置信息 Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1"); // 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式 config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(config); // 手动设置消费者消费的主题分区为2 consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0), new TopicPartition("tp_demo_01", 1), new TopicPartition("tp_demo_01", 2))); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(3)); records.forEach(new Consumer<ConsumerRecord<Integer, String>>() { @Override public void accept(ConsumerRecord<Integer, String> record) { System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition()); } }); } } }
二、问题分析
- 之所以出现上面的的问题,是因为使用了消费组的手动分区,也就是consumer.assign()方式,如果使用了手动分区,则分区的自动管理方式不会再起作用,而且如果消费组成员变更或主题的元数据等信息改变,将不会触发再平衡机制。
三、解决办法
- 使用kafka的消息订阅方式,即consumer.subscribe()方法,分区的分配等方式,让kafka集群自己去管理,不再人为干预。
- 修改源码如下:
public class OffsetConsumer { public static void main(String[] args) { //设置配置信息 Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1"); // 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式 config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(config); //订阅主题 consumer.subscribe(Arrays.asList("tp_demo_01")); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(3)); records.forEach(new Consumer<ConsumerRecord<Integer, String>>() { @Override public void accept(ConsumerRecord<Integer, String> record) { System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition()); } }); } } }
四、结果显示
文章来源地址https://www.toymoban.com/news/detail-566776.html
文章来源:https://www.toymoban.com/news/detail-566776.html
到了这里,关于使用kafka-consumer-group.sh查看消息消费情况,CONSUMER-ID,HOST,CLIENT-ID不显示问题分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!