kafka重置偏移量

这篇具有很好参考价值的文章主要介绍了kafka重置偏移量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

某些时候,kafka上游生产者生产的消息有错误,或者下游消费者并不需要消费某部分的数据,这时候,通常有两个解决方案,一种是对数据做不解析处理,直接略过。另一种就是暂时关掉kafka的消费者组,等到生产者正常后再进行消费,但由于kafka本身是默认断点续传的,此时就需要我们先重置kafka中当前kafka组的offset。

解决方案

更改消费者组

由于kafka对某topic中offset的管理是以组的形式来进行的,因此,在新建或更改消费者组后,对于offset的管理也会重新开始,策略取决于配置的auto.offset.reset参数

在重启动时指定起始offset

在再次启动时,通过配置指定要消费topic中分区的offset
@KafkaListener(groupId = "topic_group_test",topicPartitions = { @TopicPartition(topic = "topic_test",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "9830")) })
java springboot版本

通过kafka服务端脚本指定重置

kafka-consumer-groups.sh --bootstrap-server 10.202.13.27:9092 \ --group cjw --reset-offsets --topic cjw-test --to-earliest --execute
具体支持8种操作
--to-earliest
--to-latest
--to-current
--to-offset
--shift-by N: 把位移调整到当前位移+N 处,N可以为负数
--to-datetime : 把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx
--by-duration:把位移调整到距离当前时间指定间隔的位移处,格式为PnDTnHnMnS
--from-file:从CSV文件中读取调整策略

通过API代码来指定

consumer.seekToEnd( consumer.partitionsFor(topic).stream().map(partitionInfo-> new TopicPartition(topic,partitionInfo.partition())) .collect(Collectors.toList()) );
void seek(TopicPartition partition, long offset);
Void seek(TopicPartition partition,OffsetAndMetadata offsetAndMetadata);
Void seekToBeginning(Collection partitions)
Void seekToEnd(Collection partitions)

注意

以上所有操作都需要在消费者组处于未激活的情况下进行
使用代码方式时,需要指定所有分区的消费策略文章来源地址https://www.toymoban.com/news/detail-842474.html

到了这里,关于kafka重置偏移量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka—offset偏移量

    offset定义 :消费者再消费的过程中通过offset来记录消费数据的具体位置 offset存放的位置 :从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic(系统主题)中,名为__consumer_offsets,即offset维护在系统主题中 说明:__consumer_offsets 主题里面采用 key 和 value 的方式存储数

    2024年02月05日
    浏览(77)
  • Kafka消费者提交偏移量

    在Kafka中,偏移量(offset)是一个与分区相关的概念,用于跟踪一个消费者在分区中已经处理的消息位置。每个分区都有自己的偏移量,用于记录已经传递给消费者的消息的位置。 每个分区都有一个偏移量: Kafka中的每个分区都会维护一个偏移量,表示消费者在该分区中的消

    2024年01月24日
    浏览(35)
  • kafka 消息日志原理 & 指定偏移量消费 & 指定时间戳消费

    Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。 如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附

    2024年02月15日
    浏览(46)
  • kafka:消费者从指定时间的偏移开始消费(二)

    我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。 但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取

    2024年02月15日
    浏览(41)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(48)
  • 为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

    消费概念: Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。 偏移量 : 每当我们启动一个消费者时,

    2024年02月12日
    浏览(57)
  • kafka实战-消费者offset重置问题

    背景:当app启动时,会调用 “启动上报接口” 上报启动数据,该数据包含且不限于手机型号、应用版本、app类型、启动时间等,一站式接入平台系统会记录该数据。 生产者:“启动上报接口”会根据启动数据发送一条kafka消息,topic“xxx” 消费者:“启动处理模块”会监控

    2023年04月11日
    浏览(89)
  • 详解kafka中的消息日志文件:Topic消息分类、partition分区、segment分段、offset偏移量索引文件

    Kafka是一种高吞吐量的基于zookeeper协调的以集群的方式运行的分布式发布订阅消息系统,支持分区(partition)、多副本(replica),具有非常好的负载均衡能力和处理性能、容错能力。Kafka采用发布/订阅模型,消息生产者将消息发送到Kafka的消息中心(broker)中,然后消费者从

    2024年02月03日
    浏览(89)
  • android Activity设置背景为半透明的时候会显示上一个activity的内容

    在弹出 PopupWindow 时将当前Activity设置成了半透明:

    2024年02月15日
    浏览(54)
  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包