kafka查询offset&生产者offset计算&消费offset计算

这篇具有很好参考价值的文章主要介绍了kafka查询offset&生产者offset计算&消费offset计算。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka查询offset&生产者offset计算&消费offset计算

1、简介

​ kafka的介绍:略…(有兴趣的同学可自行Google,这与本文无关 ^ _ ^)

2、需求背景

​ 对kafka做监控,需要获取到kafka接收到消息的offset和被消费者消费掉消息的offset,编写接口将数值交给prometheus,直接观察判断kafka的消费性能如何。(如何自定义prometheus的监控指标后续再更新,本章只讲如何获得kafka的offset)

3、前期准备

​ 我们需要在自己的机器上装一个zookeeper和kafka,并测试kafka可用(具体怎么装,请自行搜索)

4、获取kafka生产者的offset以及消费者的offset

​ kafka是有两个offset的,在kafka结构图中,生产者生产消息到kafka后,kafka会记录一次生产者消息的offset,消费者消费掉消息后,kafka会记录一次消费者消费消息的offset,这两个offset是分开的,过程如下图:(大概意思是这样的)
kafka查询offset&生产者offset计算&消费offset计算

值得注意的是生产者和消费者的差值就是积压的消息量。(生产者的offset一定是大于等于消费者offset的)

​ 我们再来回顾一下kafka的命令行,如图:

kafka查询offset&生产者offset计算&消费offset计算

我们执行kafka提供的命令后可以看到,图中的数据,LOG-END-OFFSET就是当前生产者的offset,CURRENT-OFFSET就是当前已提交的commit offset,也就是消费者的offset,而LAG就是消息积压量。

​ 接下来我们用代码演示一遍(准备小demo,展示演示效果)

项目结构如下:

kafka查询offset&生产者offset计算&消费offset计算

代码献上:(每个步骤基本都有注释,有问题的可以私信我)

maven依赖:

        <!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>

配置文件:

server:
  port: 9494
kafka:
  address: localhost:9092
  group: kclog

代码:


import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Java Devin
 * @date 2023/1/8 14:01
 */
@Configuration
public class KafkaConfig {

    @Value("${kafka.address}")
    private String address;

    @Value("${kafka.group}")
    private String group;

    private Map<String, Object> producerProp = new HashMap<>();
    private Map<String, Object> consumerProp = new HashMap<>();

    @PostConstruct
    private void initProp() {
        // 生产者配置
        producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 消费者配置
        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

    /**
     * 返回kafka客户端
     * @return
     */
    @Bean
    public AdminClient getAdminClient() {
        return KafkaAdminClient.create(producerProp);
    }

    /**
     * 返回一个消费者
     * @return
     */
    @Bean
    public Consumer<String, String> getConsumer() {
        DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProp);
        return kafkaConsumerFactory.createConsumer();
    }
}

import com.javadevin.config.KafkaConfig;
import com.javadevin.util.KafkaUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * 查看 kafka offset 的接口
 * @author Java Devin
 * @date 2023/1/8 14:13
 */
@RestController
public class KafkaOffsetDemo {

    private static final Log log = LogFactory.getLog(KafkaOffsetDemo.class);

    @Autowired
    private KafkaConfig kafkaConfig;

    @Autowired
    private KafkaUtil kafkaUtil;

    @GetMapping("/kafkaOffset")
    public Map<String, Long> testKafkaOffset(){

        // 获取kafka客户端
        AdminClient adminClient = kafkaConfig.getAdminClient();

        // 获取消费者
        Consumer<String, String> consumer = kafkaConfig.getConsumer();

        //获取所有topic名称
        KafkaFuture<Set<String>> topics = adminClient.listTopics().names();
        Set<String> names = null;
        try {
            names = topics.get();
        } catch (Exception e) {
            log.error("kafka topic query error", e);
        }

        //写入kafka的消息总量
        Long endOffset = 0L;
        //从kafka消费的消息总量
        Long commitOffset = 0L;

        // 将每一个topic中的数据累加起来
        for (String name : names) {
            endOffset += kafkaUtil.getTopicEndOffset(consumer, name);
            commitOffset += kafkaUtil.getTopicCommitOffset(consumer, name);
        }

        Map<String, Long> map = new HashMap<>(2);

        map.put("endOffset", endOffset);
        map.put("commitOffset", commitOffset);

        return map;
    }
}

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * kafka 工具类
 * @author Java Devin
 * @date 2023/1/8 14:16
 */
@Component
public class KafkaUtil {

    /**
     * 获取单个topic的消费量
     * @param consumer
     * @param topic
     * @return
     */
    public Long getTopicCommitOffset(Consumer<String, String> consumer, String topic){
        Long commitOffset = 0L;

        // 这一步是因为 kafka架构的特殊性 在每一个topic中都存在partition 根据每个项目的不同 会配置或不配置partition
        // 消息会分散的存储在partition中 所以我们计算时应该计算所有partition的offset的和
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            OffsetAndMetadata committed = consumer.committed(topicPartition);
            if (committed != null){
                commitOffset += committed.offset();
            }
        }
        return commitOffset;
    }


    /**
     * 获取单个topic接收量
     * @param consumer
     * @param topic
     * @return
     */
    public Long getTopicEndOffset(Consumer<String, String> consumer, String topic){
        Long endOffset = 0L;

        // 这一步是因为 kafka架构的特殊性 在每一个topic中都存在partition 根据每个项目的不同 会配置或不配置partition
        // 消息会分散的存储在partition中 所以我们计算时应该计算所有partition的offset的和
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> partitions = new ArrayList<>();
        for (PartitionInfo partitionInfo : partitionInfos) {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            partitions.add(topicPartition);
        }

        Collection<Long> values = consumer.endOffsets(partitions).values();
        for (Long value : values) {
            endOffset += value;
        }
        return endOffset;
    }
}

运行结果展示:

kafka查询offset&生产者offset计算&消费offset计算

5、代码测试

我们先来查询一下offset:

kafka查询offset&生产者offset计算&消费offset计算

首先使用命令创建一个生产者:

kafka查询offset&生产者offset计算&消费offset计算

向kafka发送5条消息,然后再查询一下:
kafka查询offset&生产者offset计算&消费offset计算
kafka查询offset&生产者offset计算&消费offset计算

现在可以看到接收到消息已经为6,而消费掉的消息还是1,计算一下得出积压了5条消息未消费

现在来开启消费者,再次查询:

kafka查询offset&生产者offset计算&消费offset计算
kafka查询offset&生产者offset计算&消费offset计算

现在可以根据数据判断出kafka内的消息已经消费完了,再用命令查询一下

kafka查询offset&生产者offset计算&消费offset计算

命令查询后,结果是一样的,值得注意的是,我在电脑上配置的kafka只有一个topic,且只会给一个partition中塞数据,创建多个topic或多个partition然后测试的任务就交给大家伙了,我就不一一演示了。

6、总结

​ 其实kafka的offset计算并不难,难点在于很多api如果不去研究官方文档或者请教朋友,就无从得知,并且在计算时topic和partition概念很难一次性消化,但有时间就可以去研究一下,研究明白了会发现其实挺好玩的。

鸡汤送上:每个生命都有裂缝,如此才会有光线射进来。

最后说明,创作不易,若转载请标明出处或原文链接!!!文章来源地址https://www.toymoban.com/news/detail-425153.html

到了这里,关于kafka查询offset&生产者offset计算&消费offset计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(48)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(52)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(40)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(86)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(45)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(53)
  • 探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(88)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(46)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(43)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包