消费者提交已消费的偏移量

这篇具有很好参考价值的文章主要介绍了消费者提交已消费的偏移量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.概述

  消费者而在消费了消息之后会把消费的offset提交到 __consumer_offsets-的内置Topic中;每个消费者组都有维护一个当前消费者组的offset。那么问题来了: 消费组什么时候把offset更新到broker中的分区中呢?

Kafka消费者的配置信息

Name 描述 default
enable.auto.commit 如果为true,消费者的offset将在后台周期性的提交 true
auto.commit.interval.ms 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位) 5000

2.自动提交偏移量

消费者端开启了自动提交之后,每隔auto.commit.interval.ms自动提交一次;

public static void main(String[] args) {
    //创建kafka消费者配置对象以及配置信息
    Properties props = new Properties();
    props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    props.put("group.id", "hy-local-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "5000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //创建kafka消费者对象
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //消费消息
    kafkaConsumer.subscribe(Arrays.asList("hy1-test-topic"));
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

/*
output:
------offset-- = 5, key = null, value = NBA
------offset-- = 4, key = null, value = CBA
------offset-- = 5, key = null, value = CUBA
------offset-- = 6, key = null, value = NCAA
------offset-- = 6, key = null, value = ABA
------offset-- = 5, key = null, value = NBL
*/

假如Consumer在获取了消息消费成功但是在提交offset之前服务挂掉了,会出现什么情况?

答:重复消费

3.手动提交偏移量

  虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

手动提交 offset 的方法有两种: commitSync(同步提交)commitAsync(异步 提交)

  • 相同点: 将本次poll 的一批数据最高的偏移量提交
  • 不同点: commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

3.1 同步提交偏移量

public static void main(String[] args) {
    //创建kafka消费者配置对象以及配置信息
    Properties props = new Properties();
    props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    props.put("group.id", "hy-local-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "5000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //创建kafka消费者对象
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //订阅消息
    kafkaConsumer.subscribe(Arrays.asList("hy2-test-topic"));
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())
        }
        //同步提交,当前线程会阻塞直到 offset 提交成功
        kafkaConsumer.commitSync();
    }
}

3.2 异步提交偏移量

  虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞 吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

public static void main(String[] args) {
    //创建kafka消费者配置对象以及配置信息
    Properties props = new Properties();
    props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    props.put("group.id", "hy-local-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "5000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //创建kafka消费者对象
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //订阅消息
    kafkaConsumer.subscribe(Arrays.asList("hy2-test-topic"));
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        //异步提交
        kafkaConsumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    System.err.println("异常.....");
                }
            }
        });
    }
}

  无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费文章来源地址https://www.toymoban.com/news/detail-729801.html

到了这里,关于消费者提交已消费的偏移量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(49)
  • 消费者提交已消费的偏移量

      消费者而在消费了消息之后会把消费的offset提交到 __consumer_offsets- 的内置Topic中;每个消费者组都有维护一个当前消费者组的offset。那么问题来了: 消费组什么时候把offset更新到broker中的分区中呢? Kafka消费者的配置信息 Name 描述 default enable.auto.commit 如果为true,消费者的

    2024年02月07日
    浏览(38)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(47)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(40)
  • kafka:消费者从指定时间的偏移开始消费(二)

    我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。 但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取

    2024年02月15日
    浏览(41)
  • golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费

    当我们使用kafka的时候存在这样一个场景: 有一个消费组正在正常消费中并且消息偏移量策略为lastoffset(最新偏移量),这个时候在kafka服务器中为当前主题下新增了一个分区,各个生产者纷纷将消息投递到了这个新增分区中。当然我们知道针对于这种场景消费者方可以触发

    2024年02月09日
    浏览(43)
  • Kafka:消费者手动提交

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。 两种手动提交方式: commitSync(同步提交): 必须等待offset提交完毕,再去消费下一批数据。 同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素

    2024年02月11日
    浏览(40)
  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(61)
  • Kafka3.0.0版本——消费者(手动提交offset)

    1.1、手动提交offset的两种方式 commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。 1.2、手动提交offset两种方式的区别 相同点:都会将本次提交的一批数据最高的偏移量提交。 不

    2024年02月09日
    浏览(47)
  • Kafka3.0.0版本——消费者(自动提交 offset)

    官网文档 参数解释 参数 描述 enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 图解分析 消费者自动提交 offset代码 消费者自动提交

    2024年02月09日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包