21 | Kafka Consumer源码分析:消息消费的实现过程

这篇具有很好参考价值的文章主要介绍了21 | Kafka Consumer源码分析:消息消费的实现过程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

我们在上节中提到过,用于解决消息队列一些常见问题的知识和原理,最终落地到代码上,都包含在收、发消息这两个流程中。对于消息队列的生产和消费这两个核心流程,在大部分消息队列中,它实现的主要流程都是一样的,所以,通过这两节的学习之后,掌握了这两个流程的实现过程。无论你使用的是哪种消息队列,遇到收发消息的问题,都可以用同样的思路去分析和解决问题。

上一节一起通过分析源代码学习了 RocketMQ 消息生产的实现过程,本节我们来看一下 Kafka 消费者的源代码,理清 Kafka 消费的实现过程,并且能从中学习到一些 Kafka 的优秀设计思路和编码技巧。

在开始分析源码之前,我们一起来回顾一下 Kafka 消费模型的几个要点:

Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);

在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);

对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;

每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;

Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。

掌握并理解 Kafka 的消费模型,对于接下来理解其消费的实现过程是至关重要的,如果对上面的这些要点还有不清楚的地方,建议回顾一下之前的课程或者看一下 Kafka 相关的文档,然后再继续接下来的内容。

我们使用版本 2.2 进行分析,使用 Git 在 GitHub 上直接下载源码到本地:

git clone git@github.com:apache/kafka.git
cd kafka
git checkout 2.2

在《09 | 学习开源代码该如何入手?》这节中讲过,分析国外源码最好的方式就是从文档入手,接下来我们就找一下 Kafka 的文档,看看从哪儿来入手开启我们的分析流程。

Kafka 的 Consumer 入口类KafkaConsumer 的 JavaDoc,给出了关于如何使用 KafkaConsumer 非常详细的说明文档,并且给出了一个使用 Consumer 消费的最简代码示例:

     // 设置必要的配置信息
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     // 创建Consumer实例
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

     // 订阅Topic
     consumer.subscribe(Arrays.asList("foo", "bar"));

     // 循环拉消息
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

这段代码主要的主要流程是:

1. 设置必要的配置信息,包括:起始连接的 Broker 地址,Consumer Group 的 ID,自动提交消费位置的配置和序列化配置;

2. 创建 Consumer 实例;

3. 订阅了 2 个 Topic:foo 和 bar;

4. 循环拉取消息并打印在控制台上。

通过上面的代码实例我们可以看到,消费这个大的流程,在 Kafka 中实际上是被分成了“订阅”和“拉取消息”这两个小的流程。另外,在之前的课程中反复提到过,Kafka 在消费过程中,每个 Consumer 实例是绑定到一个分区上的,那 Consumer 是如何确定,绑定到哪一个分区上的呢?这个问题也是可以通过分析消费流程来找到答案的。所以,我们分析整个消费流程主要聚焦在三个问题上:

1. 订阅过程是如何实现的?

2. Consumer 是如何与 Coordinator 协商,确定消费哪些 Partition 的?

3. 拉取消息的过程是如何实现的?

了解前两个问题,有助于充分理解 Kafka 的元数据模型,以及 Kafka 是如何在客户端和服务端之间来交换元数据的。最后一个问题,拉取消息的实现过程,实际上就是消费的主要流程,我们上节讲过,这是消息队列最核心的两个流程之一,也是必须重点掌握的。我们就带着这三个问题,来分析 Kafka 的订阅和拉取消息的过程如何实现。

订阅过程如何实现?

我们先来看看订阅的实现流程。从上面的例子跟踪到订阅的主流程方法:

  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
      acquireAndEnsureOpen();
      try {
          // 省略部分代码

          // 重置订阅状态
          this.subscriptions.subscribe(new HashSet<>(topics), listener);

          // 更新元数据
          metadata.setTopics(subscriptions.groupSubscription());
      } finally {
          release();
      }
  }

在这个代码中,我们先忽略掉各种参数和状态检查的分支代码,订阅的主流程主要更新了两个属性:一个是订阅状态 subscriptions,另一个是更新元数据中的 topic 信息。订阅状态 subscriptions 主要维护了订阅的 topic 和 patition 的消费位置等状态信息。属性 metadata 中维护了 Kafka 集群元数据的一个子集,包括集群的 Broker 节点、Topic 和 Partition 在节点上分布,以及我们聚焦的第二个问题:Coordinator 给 Consumer 分配的 Partition 信息。

请注意一下,这个 subscribe() 方法的实现有一个非常值得大家学习的地方:就是开始的 acquireAndEnsureOpen() 和 try-finally release(),作用就是保护这个方法只能单线程调用。

Kafka 在文档中明确地注明了 Consumer 不是线程安全的,意味着 Consumer 被并发调用时会出现不可预期的结果。为了避免这种情况发生,Kafka 做了主动的检测并抛出异常,而不是放任系统产生不可预期的情况。

Kafka“主动检测不支持的情况并抛出异常,避免系统产生不可预期的行为”这种模式,对于增强的系统的健壮性是一种非常有效的做法。如果你的系统不支持用户的某种操作,正确的做法是,检测不支持的操作,直接拒绝用户操作,并给出明确的错误提示,而不应该只是在文档中写上“不要这样做”,却放任用户错误的操作,产生一些不可预期的、奇怪的错误结果。

具体 Kafka 是如何实现的并发检测,大家可以看一下方法 acquireAndEnsureOpen() 的实现,很简单也很经典,我们就不再展开讲解了。

继续跟进到更新元数据的方法 metadata.setTopics() 里面,这个方法的实现除了更新元数据类 Metadata 中的 topic 相关的一些属性以外,还调用了 Metadata.requestUpdate() 方法请求更新元数据。

    public synchronized int requestUpdate() {
        this.needUpdate = true;
        return this.updateVersion;
    }

跟进到 requestUpdate() 的方法里面我们会发现,这里面并没有真正发送更新元数据的请求,只是将需要更新元数据的标志位 needUpdate 设置为 true 就结束了。Kafka 必须确保在第一次拉消息之前元数据是可用的,也就是说在第一次拉消息之前必须更新一次元数据,否则 Consumer 就不知道它应该去哪个 Broker 上去拉哪个 Partition 的消息。

分析完订阅相关的代码,我们来总结一下:在订阅的实现过程中,Kafka 更新了订阅状态 subscriptions 和元数据 metadata 中的相关 topic 的一些属性,将元数据状态置为“需要立即更新”,但是并没有真正发送更新元数据的请求,整个过程没有和集群有任何网络数据交换。

那这个元数据会在什么时候真正做一次更新呢?我们可以先带着这个问题接着看代码。

拉取消息的过程如何实现?

接下来,我们分析拉取消息的流程。这个流程的时序图如下(点击图片可放大查看):

21 | Kafka Consumer源码分析:消息消费的实现过程,消息队列,kafka

我们对着时序图来分析它的实现流程。在 KafkaConsumer.poll() 方法 (对应源码 1179 行) 的实现里面,可以看到主要是先后调用了 2 个私有方法:

1. updateAssignmentMetadataIfNeeded(): 更新元数据。

2. pollForFetches():拉取消息。

方法 updateAssignmentMetadataIfNeeded() 中,调用了 coordinator.poll() 方法,poll() 方法里面又调用了 client.ensureFreshMetadata() 方法,在 client.ensureFreshMetadata() 方法中又调用了 client.poll() 方法,实现了与 Cluster 通信,在 Coordinator 上注册 Consumer 并拉取和更新元数据。至此,“元数据会在什么时候真正做一次更新”这个问题也有了答案。

类 ConsumerNetworkClient 封装了 Consumer 和 Cluster 之间所有的网络通信的实现,这个类是一个非常彻底的异步实现。它没有维护任何的线程,所有待发送的 Request 都存放在属性 unsent 中,返回的 Response 存放在属性 pendingCompletion 中。每次调用 poll() 方法的时候,在当前线程中发送所有待发送的 Request,处理所有收到的 Response。

我们在之前的课程中讲到过,这种异步设计的优势就是用很少的线程实现高吞吐量,劣势也非常明显,极大增加了代码的复杂度。对比上节我们分析的 RocketMQ 的代码,Producer 和 Consumer 在主要收发消息流程上功能的复杂度是差不多的,但是可以很明显地感受到 Kafka 的代码实现要比 RocketMQ 的代码实现更加的复杂难于理解。

我们继续分析方法 pollForFetches() 的实现。

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
        // 省略部分代码
        // 如果缓存里面有未读取的消息,直接返回这些消息
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
        // 构造拉取消息请求,并发送
        fetcher.sendFetches();
        // 省略部分代码
        // 发送网络请求拉取消息,等待直到有消息返回或者超时
        client.poll(pollTimer, () -> {
            return !fetcher.hasCompletedFetches();
        });
        // 省略部分代码
        // 返回拉到的消息
        return fetcher.fetchedRecords();
    }

这段代码的主要实现逻辑是:

1. 如果缓存里面有未读取的消息,直接返回这些消息;

2. 构造拉取消息请求,并发送;

3. 发送网络请求并拉取消息,等待直到有消息返回或者超时;

4. 返回拉到的消息。

在方法 fetcher.sendFetches() 的实现里面,Kafka 根据元数据的信息,构造到所有需要的 Broker 的拉消息的 Request,然后调用 client.Send() 方法将这些请求异步发送出去。并且,注册了一个回调类来处理返回的 Response,所有返回的 Response 被暂时存放在 Fetcher.completedFetches 中。需要注意的是,这时的 Request 并没有被真正发给各个 Broker,而是被暂存在了 client.unsend 中等待被发送。

然后,在调用 client.poll() 方法时,会真正将之前构造的所有 Request 发送出去,并处理收到的 Response。

最后,fetcher.fetchedRecords() 方法中,将返回的 Response 反序列化后转换为消息列表,返回给调用者。

综合上面的实现分析,我在这里给出整个拉取消息的流程涉及到的相关类的类图,在这个类图中,为了便于理解,并没有把所有类都绘制上去,只是把本节两个流程相关的主要类和这些类里的关键属性画在了图中。可以配合这个类图和上面的时序图进行代码阅读。

类图(点击图片可放大查看):

21 | Kafka Consumer源码分析:消息消费的实现过程,消息队列,kafka

小结

本节我们一起分析了 Kafka Consumer 消费消息的实现过程。大家来分析代码过程中,不仅仅是要掌握 Kafka 整个消费的流程是是如何实现的,更重要的是理解它这种完全异步的设计思想。

发送请求时,构建 Request 对象,暂存入发送队列,但不立即发送,而是等待合适的时机批量发送。并且,用回调或者 RequestFeuture 方式,预先定义好如何处理响应的逻辑。在收到 Broker 返回的响应之后,也不会立即处理,而是暂存在队列中,择机处理。那这个择机策略就比较复杂了,有可能是需要读取响应的时候,也有可能是缓冲区满了或是时间到了,都有可能触发一次真正的网络请求,也就是在 poll() 方法中发送所有待发送 Request 并处理所有 Response。

这种设计的好处是,不需要维护用于异步发送的和处理响应的线程,并且能充分发挥批量处理的优势,这也是 Kafka 的性能非常好的原因之一。这种设计的缺点也非常的明显,就是实现的复杂度太大了,如果没有深厚的代码功力,很难驾驭这么复杂的设计,并且后续维护的成本也很高。

总体来说,不推荐大家把代码设计得这么复杂。代码结构简单、清晰、易维护是是我们在设计过程中需要考虑的一个非常重要的因素。很多时候,为了获得较好的代码结构,在可接受的范围内,去牺牲一些性能,也是划算的。

思考题

我们知道,Kafka Consumer 在消费过程中是需要维护消费位置的,Consumer 每次从当前消费位置拉取一批消息,这些消息都被正常消费后,Consumer 会给 Coordinator 发一个提交位置的请求,然后消费位置会向后移动,完成一批消费过程。那 kafka Consumer 是如何维护和提交这个消费位置的呢?请你带着这个问题再回顾一下 Consumer 的代码,尝试独立分析代码并找到答案。文章来源地址https://www.toymoban.com/news/detail-833570.html

到了这里,关于21 | Kafka Consumer源码分析:消息消费的实现过程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka整理-Consumer(消费者)

    在Apache Kafka中,消费者(Consumer)是负责从Kafka的主题(Topics)读取数据的客户端应用程序。Kafka消费者的主要特点和工作原理如下: 1、订阅主题: 消费者可以订阅一个或多个Kafka主题,并从中读取数据。 2、消费者群组(Consumer Groups): 消费者可以组成消费者群组。在一个

    2024年04月10日
    浏览(50)
  • kafka之消费者(Consumer)

    1、kafka消费者消费方式         kafka 的消费者(Consumer)采用 pull 的方式主动从 broker 中拉取数据,这种不足之处会有:当 broker 中没有消息时,消费者会不断循环取数据,一直返回空数据。 2、消费者组 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组

    2024年01月20日
    浏览(40)
  • kafka-consumer-消费者代码实例

    目录 1 消费一个主题 2 消费一个分区 3 消费者组案例 消费topic为first的消息。 应用场景:当生产者将所有消息发往特定的某个主题分区。 消费first主题0号分区代码: 测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。 创建三个消费者对某一分区进行

    2024年02月11日
    浏览(39)
  • Kafka-消费者-Consumer Group Rebalance设计

    在同一个Consumer Group中,同一个Topic的不同分区会分配给不同的消费者进行消费,那么为消费者分配分区的操作是在Kafka服务端完成的吗?分区是如何进行分配呢?下面来分析Rebalance操作的原理。 Kafka最开始的解决方案是通过ZooKeeper的Watcher实现的。 每个Consumer Group在ZooKeeper下都维

    2024年01月19日
    浏览(51)
  • RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题

    RabbitMq 消息可靠性问题(一) — publisher发送时丢失 前面我们从publisher的方向出发解决了发送时丢失的问题,那么我们在发送消息到exchange, 再由exchange转存到queue的过程中。如果MQ宕机了,那么我们的消息是如何确保可靠性的呢?当消息由队列发到对应的消费者处理时,consumer 接

    2024年02月11日
    浏览(44)
  • kafka-consumer-groups.sh消费者组管理

      先调用 MetadataRequest 拿到所有在线Broker列表 再给每个Broker发送 ListGroupsRequest 请求获取 消费者组数据。 查看指定消费组详情 --group 查看所有消费组详情 --all-groups 查询消费者成员信息 --members 查询消费者状态信息 --state 删除指定消费组 --group 删除所有消费组 --all-groups 想要

    2024年02月03日
    浏览(44)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(47)
  • [RocketMQ] Consumer消费者启动主要流程源码 (六)

    客户端常用的消费者类是DefaultMQPushConsumer, DefaultMQPushConsumer的构造器以及start方法的源码。 1.创建DefaultMQPushConsumer实例 最终都是调用下面四个参数的构造函数: 指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器, 创建了一个DefaultMQPushConsumerImpl实例

    2024年02月16日
    浏览(47)
  • kafka启用SASL认证后使用kafka-consumer-groups.sh查看消费组报错的问题

    解决SASL认证类型kafka在使用kafka-consumer-groups.sh查看消费组数据时,报以下异常的问题 解决方案: 进入docker容器,非docker部署进入kafka安装地址即可: 进入容器 docker exec -it kafka容器ID bash 进入kafka的配置config文件夹: cd   /home/zk/kafka_2.11-2.1.1/config 执行命令: 输入内容并保存:

    2024年02月07日
    浏览(41)
  • Kafka指定分区消费及consumer-id,client-id相关概念解析

    xxxx系列(1)― xxxx系列(2)― xxxxx系列(3)― 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 在最近使用Kafka过程中,发现使用@KafkaListener指定分区消费时(指定了所有分区),如果服务是多节点,会出现重复消费的现象,即两个服务节点中的消费者均会消

    2024年02月13日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包