为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

这篇具有很好参考价值的文章主要介绍了为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、为什么需要带有 subscribe 的 group.id

  • 消费概念:
    Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。
  • 偏移量
    每当我们启动一个消费者时,它都会加入一个消费者组,然后根据该消费者组中的其他消费者数量,为其分配要读取的分区。对于这些分区,它会检查列表读取偏移量是否已知,如果找到,它将从这一点开始读取消息。如果没有找到偏移量,则参数 auto.offset.reset 控制是从分区中最早的消息还是从最新的消息开始读取。

二、我们需要使用commitSync手动提交偏移量吗?

  • 是否需要手动提交偏移?
    是否需要提交偏移量取决于作为参数 enable.auto.commit 选择的值。默认情况下,此设置为 true,这意味着消费者将定期自动提交其偏移量(由auto.commit.interval.ms 决定提交的频率)。如果将其设置为 false,那么将需要自己提交偏移量。这种默认行为可能也是导致很多发现 kafka 总是从最新的开始消费的原因,由于偏移量是自动提交的,因此它将使用该偏移量。

  • 有没有办法从头开始重播消息?
    如果想每次都从头开始读取,可以调用seekToBeginning,如果不带参数调用,它将重置为所有订阅分区中的第一条消息,或者仅重置您传入的那些分区。

  • seekToBeginning
    查找每个给定分区的第一个偏移量。poll(long) 该函数延迟计算,仅在调用或时才查找所有分区中的第一个偏移量position(TopicPartition)。如果未提供分区,则查找所有当前分配的分区的第一个偏移量。

    public class MyListener implements ConsumerSeekAware {
    
    ...
    
      @Override
      public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
          callback.seekToBeginning(assignments.keySet());
      }
    
    }
    
  • 有没有办法从最后开始重播消息?
    有的,可以使用 seekToEnd() 查找所有分配的分区到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分区到该时间戳表示的偏移量。

    public class MyListener extends AbstractConsumerSeekAware {
    
      @KafkaListener(...)
      void listn(...) {
          ...
      }
    }
    
    public class SomeOtherBean {
    
      MyListener listener;
    
      ...
    
      void someMethod() {
          this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
      }
    
    }
    

三、如果我想手动提交偏移量,该怎么做?

  • 1、禁用自动提交

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
  • 提交方法
    对于手动提交,KafkaConsumers提供了两种方法,即 commitSync() 和 commitAsync()。commitSync()是一个阻塞调用,在偏移量成功提交后返回,commitAsync()则立即返回。如果想知道提交是否成功,可以为回调处理程序 ( OffsetCommitCallback) 提供一个方法参数。请注意,在两次提交调用中,消费者都会提交最新poll()调用的偏移量。
    举个例子:假设一个分区主题有一个消费者并且最后一次调用poll()返回偏移量为 4、5、6 的消息。提交时,偏移量 6 将被提交,因为这是消费者客户端跟踪的最新偏移量。
    同时,commitSync() 和 commitAsync() 都允许更多地控制我们想要提交的偏移量:如果你使用允许你指定的相应重载,那么Map<TopicPartition, OffsetAndMetadata>消费者将仅提交指定的偏移量(即,映射可以包含分配的分区的任何子集) ,并且指定的偏移量可以为任意值)。

  • 同步提交:
    阻塞线程,直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
          System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
          consumer.commitSync();
      }
    }
    

    对于 for 循环中的每次迭代,只有在consumer.commitSync()成功返回或因抛出异常而中断后,代码才会移至下一次迭代。

  • 异步提交:
    是一种非阻塞方法。调用它不会阻塞线程。相反,它将继续处理以下指令,无论最终是成功还是失败。

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
          System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
          consumer.commitAsync(callback);
      }
    }
    

    对于 for 循环中的每次迭代,无论consumer.commitAsync()最终会发生什么,代码都会移至下一次迭代。并且,提交的结果将由定义的回调函数处理。

  • 权衡:延迟与数据一致性
    1、如果必须确保数据一致性,请选择commitSync(),因为它将确保在执行任何进一步操作之前,你将知道偏移量提交是成功还是失败。但由于它是同步和阻塞的,你将花费更多的时间来等待提交完成,这会导致高延迟。
    2、如果可以接受某些数据不一致并希望具有低延迟,请选择commitAsync(),因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时代码将继续执行。文章来源地址https://www.toymoban.com/news/detail-658353.html

到了这里,关于为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 视觉化洞察:为什么我们需要数据可视化?

    为什么我们需要数据可视化?这个问题在信息时代变得愈发重要。数据,如今已成为生活的一部分,我们每天都在产生大量的数据,从社交媒体到购物记录,从健康数据到工作表现,数据无处不在。然而,数据本身通常是冷冰冰的数字,对于大多数人而言,理解和分析这些数

    2024年02月10日
    浏览(52)
  • 什么是Web3.0?为什么我们需要 Web 3.0

    为了更好地理解什么是 Web 3.0,我们需要知道什么是 Web 1.0 和 2.0。 为了不让你厌烦,这里简单的解释一下: WEB 3.0 例子:xiaqo.com Web 1.0  —— 信息仅从网站传递给用户。 Web 2.0  —— 信息是双向的。 用户可以与网站交互互动。 Web 3.0  —— 伟大的超越。 信息变得开放、分散

    2024年02月03日
    浏览(66)
  • 【云原生-白皮书】简章1:为什么我们需要云原生架构?

    声明:本文为《阿里云云原生架构核心技术白皮书》的一些读书笔记与感想。 一文大致了解云原生架构模式特点传送门:五分钟了解云原生的架构模式 声明:本文是阅读阿里云云原生架构核心技术白皮书的一些读书笔记与感想。 云原生架构是一种创新的软件开发方法,专为

    2023年04月26日
    浏览(57)
  • 什么是分布式操作系统?我们为什么需要分布式操作系统?

    分布式操作系统是一种特殊的操作系统,本质上属于多机操作系统,是传统单机操作系统的发展和延伸。它是将一个计算机系统划分为多个独立的计算单元(或者也可称为节点),这些节点被部署到每台计算机上,然后被网络连接起来,并保持着持续的通信状态。在分布式操作

    2024年02月16日
    浏览(55)
  • 【Golang】三分钟让你快速了解Go语言&为什么我们需要Go语言?

    博主简介: 努力学习的大一在校计算机专业学生,热爱学习和创作。目前在学习和分享:数据结构、Go,Java等相关知识。 博主主页: @是瑶瑶子啦 所属专栏: Go语言核心编程 近期目标: 写好专栏的每一篇文章 Go 语言从 2009 年 9 月 21 日开始作为谷歌公司 20% 兼职项目,即相关

    2023年04月21日
    浏览(61)
  • Linux drm内存管理(一) 浅谈TTM与GEM,为什么我们需要TTM和GEM?

    @[TOC](Linux drm内存管理(一) 为什么我们需要TTM和GEM?) 系列文章(更新中): Linux drm内存管理(二) TTM内存管理基础概念   目前Kernel中DRM中GPU的VRAM(GPU片上显存)的管理框架是有GEM和TTM,其中TTM早于GEM出现,GEM的出现是为了解决TTM复杂的使用方法,将大部分的VRAM管理实现逻辑交由

    2023年04月20日
    浏览(50)
  • Python(一):为什么我们要学习Python?

    ❤️ 专栏简介:本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中,我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 :本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无论你是学生、职场人士还是

    2024年02月13日
    浏览(71)
  • Elasticsearch:什么是向量和向量存储数据库,我们为什么关心?

    Elasticsearch 从 7.3 版本开始支持向量搜索。从 8.0 开始支持带有 HNSW 的 ANN 向量搜索。目前 Elasticsearch 已经是全球下载量最多的向量数据库。它允许使用密集向量和向量比较来搜索文档。 向量搜索在人工智能和机器学习领域有许多重要的应用。 有效存储和检索向量的数据库对于

    2024年02月08日
    浏览(52)
  • 数据结构与算法这么难,为什么我们还要学习?

    提到数据结构与算法,就一定会伴随着诸多所谓的坚持和抱怨。同时,还有两个词总是出现,一个是内功,是对知识的定位,一个是吃透,是对自己

    2024年01月19日
    浏览(56)
  • 为什么需要数据仓库

    为什么不在OLTP环境下分析?  OLTP环境也会存储历史数据,但这些历史数据并不是业务运行所需的,这些历史数据需要经常归档到数据仓库,并且在OLTP数据库中删除。 相比之下,事务环境适用于连续处理事务,通常应用于订单录入以及财务和零售事务。它们并不依赖历史数据

    2024年01月25日
    浏览(66)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包