Java实现Kafka指定(offset)位置进行消费

这篇具有很好参考价值的文章主要介绍了Java实现Kafka指定(offset)位置进行消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在消息队列Kafka中,可以指定kafka的消费位置从某个游标位置开始消费,

 // 创建消费者
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", servers); // 集群服务器地址,逗号分隔
            props.setProperty("max.poll.records", String.valueOf(DrawConstant.MAX_PULL_COUNT)); // 最大拉取消费消息数量
            groupId = UUID.randomUUID().toString(); // 随机组ID
            props.setProperty("group.id", groupId); // 设置组ID
            if (fromBegining) {
                // 从头开始接收
                props.setProperty("auto.offset.reset", "latest"); // 设置从头开始接收
            }
            props.setProperty("enable.auto.commit", "false"); // 关闭自动提交offset
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的格式是String
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的格式是String
            // 创建kafka实例
            consumer = new KafkaConsumer<>(props);
            // offset 游标位置
            long s = 10;
            // 订阅主题
            consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
            consumer.seek(new TopicPartition(topic, 0), s);

其中 auto.offset.reset 属性可以设置为  none earliest latest 三种 属性如下

none

如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常

earliest

在各分区下有提交的offset时:从offset处开始消费

在各分区下无提交的offset时:从头开始消费

latest

在各分区下有提交的offset时:从offset处开始消费

在各分区下无提交的offset时:从最新的数据开始消费

当消费过的数据想要进行重复消费时   需要修改 kafka consumer 的 groupId  在同一个groupId下的数据  不会被重复消费,当修改为一个新的groupId时,则认为是一个全新的消费者,可以对一个topic下的数据进行重复消费文章来源地址https://www.toymoban.com/news/detail-505544.html

到了这里,关于Java实现Kafka指定(offset)位置进行消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 消息日志原理 & 指定偏移量消费 & 指定时间戳消费

    Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。 如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附

    2024年02月15日
    浏览(46)
  • Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

    Kafka消费者提交Offset的策略有 自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了 手动提交Offset 消费者在消费消息时/后,再提交o

    2024年02月08日
    浏览(49)
  • Java实现Kafka消费者及消息异步回调方式

    Kafka 在创建消费者进行消费数据时,由于可以理解成为是一个kafka 的单独线程,所以在Kafka消费数据时想要在外部对消费到的数据进行业务处理时是获取不到的,所以就需要实现一个消息回调的接口来进行数据的保存及使用。 消息回调接口实现代码如下 Kafka消费者代码实现如

    2024年02月06日
    浏览(54)
  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(61)
  • kafka查询offset&生产者offset计算&消费offset计算

    1、简介 ​ kafka的介绍:略…(有兴趣的同学可自行Google,这与本文无关 ^ _ ^) 2、需求背景 ​ 对kafka做监控,需要获取到kafka接收到消息的offset和被消费者消费掉消息的offset,编写接口将数值交给prometheus,直接观察判断kafka的消费性能如何。(如何自定义prometheus的监控指标后续

    2023年04月25日
    浏览(84)
  • 【Kafka】【十九】新消费组的消费offset规则

    新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1) latest:默认的,消费新消息 earliest:第⼀次从头开始消费。

    2024年02月08日
    浏览(34)
  • kafka消费者api和分区分配和offset消费

    消费者的消费方式为主动从broker拉取消息,由于消费者的消费速度不同,由broker决定消息发送速度难以适应所有消费者的能力 拉取数据的问题在于,消费者可能会获得空数据 Consumer Group(CG):消费者组 由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同

    2024年02月16日
    浏览(50)
  • kafka实战-消费者offset重置问题

    背景:当app启动时,会调用 “启动上报接口” 上报启动数据,该数据包含且不限于手机型号、应用版本、app类型、启动时间等,一站式接入平台系统会记录该数据。 生产者:“启动上报接口”会根据启动数据发送一条kafka消息,topic“xxx” 消费者:“启动处理模块”会监控

    2023年04月11日
    浏览(89)
  • Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)

    1.1 Kafka消费方式 1、pull(拉)模式:consumer采用从broker中主动拉取数据。 2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。 pull模式不足之处是如果Kafka没有数

    2024年02月16日
    浏览(46)
  • 全网最详细地理解Kafka中的Topic和Partition以及关于kafka的消息分发、服务端如何消费指定分区、kafka的分区分配策略(range策略和RoundRobin策略)

    最近在学习kafka相关的知识,特将学习成功记录成文章,以供大家共同学习。 首先要注意的是, Kafka 中的 Topic 和 ActiveMQ 中的 Topic 是不一样的。 在 Kafka 中, Topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 Kafka 集群的消息都有一个类别。 物理上

    2024年01月25日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包