使用kafka-consumer-group.sh查看消息消费情况,CONSUMER-ID,HOST,CLIENT-ID不显示问题分析

这篇具有很好参考价值的文章主要介绍了使用kafka-consumer-group.sh查看消息消费情况,CONSUMER-ID,HOST,CLIENT-ID不显示问题分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、问题描述

  1. 在使用使用kafka-consumer-group.sh查看消息消费情况,消息都已经消费完了,但是CONSUMER-ID,HOST,CLIENT-ID字段的信息不显示,而且,消费实例也在运行中,却出现了Consumer group 'manage.group1' has no active members.,如下图所示:consumer-id,kafka消息队列,kafka,java,分布式
  2. 消费者的代码如下:
    public class OffsetConsumer {
    
        public static void main(String[] args) {
    
            //设置配置信息
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1");
            // 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式
            config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(config);
            // 手动设置消费者消费的主题分区为2
            consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0), 
                                          new TopicPartition("tp_demo_01", 1), 
                                          new TopicPartition("tp_demo_01", 2)));
    
            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(3));
                records.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
                    @Override
                    public void accept(ConsumerRecord<Integer, String> record) {
                        System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition());
                    }
                });
            }
        }
    }

 二、问题分析

  • 之所以出现上面的的问题,是因为使用了消费组的手动分区,也就是consumer.assign()方式,如果使用了手动分区,则分区的自动管理方式不会再起作用,而且如果消费组成员变更或主题的元数据等信息改变,将不会触发再平衡机制。

三、解决办法

  • 使用kafka的消息订阅方式,即consumer.subscribe()方法,分区的分配等方式,让kafka集群自己去管理,不再人为干预。
  • 修改源码如下:
    public class OffsetConsumer {
    
        public static void main(String[] args) {
    
            //设置配置信息
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1");
            // 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式
            config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(config);
    
            //订阅主题
            consumer.subscribe(Arrays.asList("tp_demo_01"));
            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(3));
                records.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
                    @Override
                    public void accept(ConsumerRecord<Integer, String> record) {
                        System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition());
                    }
                });
            }
        }
    }

 四、结果显示

  • consumer-id,kafka消息队列,kafka,java,分布式

 文章来源地址https://www.toymoban.com/news/detail-566776.html

到了这里,关于使用kafka-consumer-group.sh查看消息消费情况,CONSUMER-ID,HOST,CLIENT-ID不显示问题分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka-消费者-Consumer Group Rebalance设计

    在同一个Consumer Group中,同一个Topic的不同分区会分配给不同的消费者进行消费,那么为消费者分配分区的操作是在Kafka服务端完成的吗?分区是如何进行分配呢?下面来分析Rebalance操作的原理。 Kafka最开始的解决方案是通过ZooKeeper的Watcher实现的。 每个Consumer Group在ZooKeeper下都维

    2024年01月19日
    浏览(51)
  • 【项目实战】Kafka 重平衡 Consumer Group Rebalance 机制

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(45)
  • kafka报错:No group.id found in consumer config, container properties

    Caused by: java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used. 报错提示没有配置groupid,那么配置groupid即可 配置group-id,重启服务器即可

    2024年02月11日
    浏览(41)
  • 21 | Kafka Consumer源码分析:消息消费的实现过程

    我们在上节中提到过,用于解决消息队列一些常见问题的知识和原理,最终落地到代码上,都包含在收、发消息这两个流程中。对于消息队列的生产和消费这两个核心流程,在大部分消息队列中,它实现的主要流程都是一样的,所以,通过这两节的学习之后,掌握了这两个流

    2024年02月21日
    浏览(43)
  • kafka消费者报错Offset commit ......it is likely that the consumer was kicked out of the group的解决

    2022年10月份接到一个小功能,对接kafka将数据写到数据库,开始的需求就是无脑批量insert,随着时间的推移,业务需求有变更,kafka的生产消息频次越来越高,到今年7月份为止就每秒会有几十条甚至上百条,然后消费消息的代码就报错: Caused by: org.apache.kafka.clients.consumer.Com

    2024年02月07日
    浏览(54)
  • Kafka中的group_id:实现消息分组消费的关键

    Kafka是一种高性能、可扩展的分布式消息系统,被广泛应用于大规模数据流处理的场景。在Kafka中,group_id是一个关键概念,用于实现消息的分组消费。本文将详细介绍group_id的作用和使用方法,并提供相应的源代码示例。 Kafka消息分组消费的概念是指多个消费者协同消费同一

    2024年02月03日
    浏览(41)
  • SpringBoot中使用Kafka报错:Failed to construct kafka consumer

    在SpringBoot项目中使用了Kafka,在启动的过程中报错 原因在报错中很清晰了,消费者反序列化使用的类错误 把spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerSerializer 改为 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer 就好了

    2024年02月11日
    浏览(44)
  • 查看kafka消息消费堆积情况

    查看主题命令 展示topic列表 描述topic 查看topic某分区偏移量最大(小)值 增加topic分区数 删除topic:慎用,只会删除zookeeper中的元数据,消息文件须手动删除 方法一: 方法二: 待验证 查看topic消费进度,必须参数为–group, 不指定–topic,默认为所有topic, 列出所有主题中的

    2024年03月13日
    浏览(80)
  • Kafka查看消息是否堆积(服务器)

    查询消费组 先进入kafka中的bin目录 ./kafka-consumer-groups.sh --bootstrap-server ip:9092 --list  查询消费组的消费情况 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092 --describe --group 组名称   解析 TOPIC :topic消息队列id CURRENT-OFFSET :当前消费的偏移量 LAG:消息堆积数量 查看分区中消息偏移量

    2024年02月06日
    浏览(46)
  • kafka查看topic和消息内容命令

    ①创建一个测试用的topic ② 用Kafka的console-producer在topic test 生产消息 ③ 用Kafka的console-consumer 消费topic test的消息 ④查询topic,进入kafka目录: ⑤查询topic内容: ⑥查看topic 为 test的 详细信息 ⑦往topic 为 test的内部生产消息 ⑧从topic 为test的内部消费消息 ⑨删除kafka的测试top

    2024年02月11日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包