kafka复习:(11)auto.offset.reset的默认值

这篇具有很好参考价值的文章主要介绍了kafka复习:(11)auto.offset.reset的默认值。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在ConsumerConfig这个类中定义了这个属性的默认值,如下图
kafka复习:(11)auto.offset.reset的默认值,kafka,kafka,分布式
也就是默认值为latest,它的含义是:如果没有客户端提交过offset的话,当新的客户端消费时,把最新的offset设置为当前消费的offset.

默认是自动提交位移的,每5秒进行一次提交。可以通过参数配置手动提交。

手动提交offset的示例文章来源地址https://www.toymoban.com/news/detail-670042.html


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
/*
设置手动提交offset
 */

public class KafkaTest08 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup8");
        //设置手动提交位移
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        myConsumer.subscribe(Arrays.asList("student"));

        int i=0;


        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());

            }

            myConsumer.commitSync();
            
            if(i==0){
                //myConsumer.commitSync();
                i ++;
            }
            else {
                i ++;
            }

        }



    }
}

到了这里,关于kafka复习:(11)auto.offset.reset的默认值的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • reset Offset 与connection reset by peer

    某次生产线上,从KafkaManager监控页面,发现还剩几十万未消费数据量,过了几分钟之后,监控页面发现未消费数据量达到了几千万。 定位生产日志,发现消费端 出现日志 reset offset , 结合上下文日志,发现在切换broker leader之后,提交的偏移量在新leader上面找不到,之后根据消

    2024年02月16日
    浏览(44)
  • 分布式计算----期末复习题(仅供参考)

    一.单选题,每个2分 1.Hadoop 之父 是下面的哪一位?(B) A. James Gosling        B.Doug Cutting    C.Matei Zaharia   D.Linus Benedict Torvalds 2.Hadoop中,用于 处理或者分析海量数据 的组件是哪一个?(  B   ) A.HDFS     B.MapReduce     C.Yarn   D.以上选项都不是 3.HDFS中 存储和管理元数据

    2024年02月10日
    浏览(49)
  • hadoop分布式系统复习题 选择题

    1、以下哪一项不属于 Hadoop 可以运行的模式? 互联模式 2、下面哪个程序负责 HDFS 数据存储? Datanode 3、HDFS 中的 block 默认保存__3 _份。 4、配置Hadoop时,JAVA_HOME包含在哪一个配置文件中 hadoop-env.sh 。 5、 Hadoop fs中的-get和-put命令操作对象是 文件和目录。 6、(多选)以下关于

    2024年02月17日
    浏览(42)
  • 分布式系统与云计算期末复习(选择题)

    1 、下列哪项描述不是分布式系统的特性( C ) A、透明性 B、开放性 C、易用性 D、可扩展性 2. 下列描述正确的是 ( A ) A、基于中间件的系统要比网络操作系统的透明性高 B、网络操作系统要比分布式操作系统的透明性高 C、基于中间件的系统要比分布式操作系统的透明性高

    2024年02月09日
    浏览(42)
  • 【分布式与云计算期末复习】比斯兔考试版

    仅自己的期末复习笔记,有些零散,但大部分包括了往年例题的考点,来自于书本和其他博客。觉得我写的乱的也可以根据这些考点自己去百度和csdn其他大佬的博客自己填充~ 目录 故障 故障检测 故障屏蔽 故障解决方法  心跳检测 Lease租约机制 数据分布方式 副本 数据副本

    2024年02月05日
    浏览(44)
  • 【大数据工具】Kafka伪分布式、分布式安装和Kafka-manager工具安装与使用

    Kafka 安装包下载地址:https://archive.apache.org/dist/kafka/ 1. Kafka 伪分布式安装 1. 上传并解压 Kafka 安装包 使用 FileZilla 或其他文件传输工具上传 Kafka 安装包: kafka_2.11-0.10.0.0.tgz 解压安装包 2. 编辑配置文件 3. 拷贝并修改配置文件 分别修改 server2.properties、server3.properties 4. 创建日志

    2024年02月14日
    浏览(45)
  • 分布式消息服务kafka

    什么是消息中间件? 消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。 例如:在淘宝购物时,订单系统处理完订单后,把订单消息发送到消息中间件中,由消息中间件将订单消息分发到下

    2024年02月01日
    浏览(43)
  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(51)
  • 【分布式应用】kafka集群、Filebeat+Kafka+ELK搭建

    主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。 我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队

    2024年02月16日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包