kafka的auto.offset.reset详解与测试

这篇具有很好参考价值的文章主要介绍了kafka的auto.offset.reset详解与测试。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  1. 取值及定义#
    auto.offset.reset有以下三个可选值:

latest (默认)
earliest
none
三者均有共同定义:
对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费

意思就是,只要这个消费者组消费过了,不管auto.offset.reset指定成什么值,效果都一样,每次启动都是已有的最新的offset开始接着往后消费

不同的点为:

latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据
就是说如果这个topic有历史消息,现在新启动了一个消费者组,且auto.offset.reset=latest,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时topic有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交offset
这时候如果该消费者组意外下线了,topic仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的offset处开始接着消费,此时走的就是共同定义

earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费
就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。
一旦该消费者组消费过topic后,此时就有该消费者组的offset了,这种情况下即使指定了auto.offset.reset=earliest,再重新启动该消费者组,效果是与latest一样的,也就是此时走的是共同的定义

none:对于同一个消费者组,若没有提交过offset,会抛异常
一般生产环境基本用不到该参数

  1. 新建全新topic#
    ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic TestOffsetResetTopic --partitions 1 --replication-factor 1 --create

  2. 往新建的topic发送消息#
    便于测试,用Java代码发送5条消息

public class TestProducer {

public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    String topic = "TestOffsetResetTopic";

    for (int i = 0; i < 5; i++) {
        String value = "message_" + i + "_" + LocalDateTime.now();
        System.out.println("Send value: " + value);
        producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> {
            if (exception == null) {
                String str = MessageFormat.format("Send success! topic: {0}, partition: {1}, offset: {2}", metadata.topic(), metadata.partition(), metadata.offset());
                System.out.println(str);
            }
        });
        Thread.sleep(500);
    }

    producer.close();
}

}

发送消息成功:

Send value: message_0_2022-09-16T18:26:15.943749600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 0
Send value: message_1_2022-09-16T18:26:17.066608900
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 1
Send value: message_2_2022-09-16T18:26:17.568667200
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 2
Send value: message_3_2022-09-16T18:26:18.069093600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 3
Send value: message_4_2022-09-16T18:26:18.583288100
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 4

现在TestOffsetResetTopic这个topic有5条消息,且还没有任何消费者组对其进行消费过,也就是没有任何offset

  1. 测试latest#
    已知topic已经存在5条历史消息,此时启动一个消费者

public class TestConsumerLatest {

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    // 设置 auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    String topic = "TestOffsetResetTopic";
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    // 消费数据
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            System.out.println(consumerRecord);
        }
    }

}

}

发现如上面所述,历史已存在的5条消息不会消费到,消费者没有任何动静,现在保持消费者在线

启动TestProducer再发5条消息,会发现这后面新发的,offset从5开始的消息就被消费了

ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1663329725731, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_0_2022-09-16T20:02:05.523581500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1663329726251, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_1_2022-09-16T20:02:06.251399400)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1663329726764, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_2_2022-09-16T20:02:06.764186200)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1663329727264, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_3_2022-09-16T20:02:07.264268500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1663329727778, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_4_2022-09-16T20:02:07.778469700)

此时该消费者组对于这个topic的offset已经为9了,现在停掉这个消费者(下线),再启动TestProducer发5条消息,接着再启动TestConsumerLatest,会发现紧接上一次的offset之后开始,即从10继续消费

如果测试发现没动静,请多等一会,估计机器性能太差…

  1. 测试earliest#
    新建一个测试消费者,设置auto.offset.reset为earliest,注意groupid为新的group2,表示对于topic来说是全新的消费者组

public class TestConsumerEarliest {

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
    // 设置 auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    String topic = "TestOffsetResetTopic";
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    // 消费数据
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            System.out.println(consumerRecord);
        }
    }

}

}

一运行发现已有的10条消息(最开始5条加上面一次测试又发了5条,一共10条)是可以被消费到的,且消费完后,对于这个topic就已经有了group2这个组的offset了,无论之后启停,只要groupid不变,都会从最新的offset往后开始消费

  1. 测试none#
    新建一个测试消费者,设置auto.offset.reset为none,注意groupid为新的group3,表示对于topic来说是全新的消费者组

public class TestConsumerNone {

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group3");
    // 设置 auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

    String topic = "TestOffsetResetTopic";
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    // 消费数据
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            System.out.println(consumerRecord);
        }
    }

}

}

一运行,程序报错,因为对于topic来说是全新的消费者组,且又指定了auto.offset.reset为none,直接抛异常,程序退出

Exception in thread “main” org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [TestOffsetResetTopic-0]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:706)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2434)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1266)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at kakfa.TestConsumerNone.main(TestConsumerNone.java:31)文章来源地址https://www.toymoban.com/news/detail-778840.html

  1. 总结#
    如果topic已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定auto.offset.reset为earliest,才可以消费到历史数据,之后就有提交offset。有了offset,无论是earliest还是latest,效果都是一样的了。
    如果topic没有历史消息,或者不需要处理历史消息,那按照默认latest即可。

到了这里,关于kafka的auto.offset.reset详解与测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java实现Kafka指定(offset)位置进行消费

    在消息队列Kafka中,可以指定kafka的消费位置从某个游标位置开始消费, 其中 auto.offset.reset 属性可以设置为  none earliest latest 三种 属性如下 none 如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常 earliest 在各分区下有提交的offs

    2024年02月11日
    浏览(45)
  • Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

    Kafka消费者提交Offset的策略有 自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了 手动提交Offset 消费者在消费消息时/后,再提交o

    2024年02月08日
    浏览(49)
  • 详解kafka中的消息日志文件:Topic消息分类、partition分区、segment分段、offset偏移量索引文件

    Kafka是一种高吞吐量的基于zookeeper协调的以集群的方式运行的分布式发布订阅消息系统,支持分区(partition)、多副本(replica),具有非常好的负载均衡能力和处理性能、容错能力。Kafka采用发布/订阅模型,消息生产者将消息发送到Kafka的消息中心(broker)中,然后消费者从

    2024年02月03日
    浏览(89)
  • kafka查询offset&生产者offset计算&消费offset计算

    1、简介 ​ kafka的介绍:略…(有兴趣的同学可自行Google,这与本文无关 ^ _ ^) 2、需求背景 ​ 对kafka做监控,需要获取到kafka接收到消息的offset和被消费者消费掉消息的offset,编写接口将数值交给prometheus,直接观察判断kafka的消费性能如何。(如何自定义prometheus的监控指标后续

    2023年04月25日
    浏览(84)
  • kafka—offset偏移量

    offset定义 :消费者再消费的过程中通过offset来记录消费数据的具体位置 offset存放的位置 :从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic(系统主题)中,名为__consumer_offsets,即offset维护在系统主题中 说明:__consumer_offsets 主题里面采用 key 和 value 的方式存储数

    2024年02月05日
    浏览(78)
  • Kafka之offset位移

    offset :在 Apache Kafka 中,offset 是一个用来唯一标识消息在分区中位置的数字。每个分区中的消息都会被分配一个唯一的 offset 值,用来表示该消息在该分区中的位置。消费者可以通过记录自己消费的最后一个 offset 值来跟踪自己消费消息的进度,确保不会漏掉消息或者重复消

    2024年04月10日
    浏览(43)
  • Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

    虽然offset十分遍历,但是由于其是基于时间提交的,开发人员难以把握offset提交的实际。因此Kafka还提供了手动提交offset的API 手动提交offset的方法有两种:分别commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交:不同点是

    2024年02月11日
    浏览(49)
  • Kafka中offset的相关操作

    offset用于记录消息消费的进度,主要有以下几种, Current offset,用于记录消费者已经接收到(不一定有完成消费)的消息序号,保证同一个消息不会被重复消费,可以我们通过kafka-consumer-groups.sh查询,这也是我们测试或者实际环境需要调整的offset Committed offset,用于记录消费者已

    2024年02月12日
    浏览(37)
  • Offset Explorer中添加Kafka连接

    Kafka连接 1. Host中填IP,Port中填端口号,点击test测试链接,点击Add添加链接;  2. 若第一种方法添加链接不成功,点击Advanced,在bootstrap server中填入ip:port即可链接成功。  

    2024年02月16日
    浏览(42)
  • kafka实战-消费者offset重置问题

    背景:当app启动时,会调用 “启动上报接口” 上报启动数据,该数据包含且不限于手机型号、应用版本、app类型、启动时间等,一站式接入平台系统会记录该数据。 生产者:“启动上报接口”会根据启动数据发送一条kafka消息,topic“xxx” 消费者:“启动处理模块”会监控

    2023年04月11日
    浏览(89)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包