kafka—offset偏移量

这篇具有很好参考价值的文章主要介绍了kafka—offset偏移量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、offset的基本概述

offset定义:消费者再消费的过程中通过offset来记录消费数据的具体位置

offset存放的位置:从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic(系统主题)中,名为__consumer_offsets,即offset维护在系统主题中

说明:__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact(压缩存储),也就是每个 group.id+topic+分区号就保留最新数据

1.面试题☆☆☆

问:消费者的offset维护在什么位置

答:在0.9版本之前维护在zookeeper当中,0.9版本之后维护在系统主题当中

二、自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能

自动提交offset的相关参数如下:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @author wangbo
 * @version 1.0
 */

/**
 * 自动提交offset
 */

public class CustomConsumer_03 {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();

        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //配置消费者组ID 可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //自动提交,默认为true采用自动提交,为false则为手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        //提交时间间隔,默认为5000毫秒,即5s。我们修改为2秒
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,2000);

        //1.创建一个消费者 "","hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅主题 first3
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("first3");
        kafkaConsumer.subscribe(topics);

        //3.消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据

            //循环打印消费的数据 consumerRecords.for
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

三、手动提交offset

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API

手动提交offset的方法有两种:分别是commitSync同步提交)和commitAsync异步提交

  • 相同点:都会将本次提交的一批数据最高的偏移量提交
  • 不同点:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败

比较

同步提交:必须等待offset提交完毕,再去消费下一批数据 ,由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低

异步提交:发送完提交offset请求后,就开始消费下一批数据了,由于同步提交 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

同步提交和异步提交的API代码

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @author wangbo
 * @version 1.0
 */

/**
 * offset 同步提交
 */

public class CustomConsumer_04 {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();

        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //配置消费者组ID 可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //手动提交 需要将参数改为false
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


        //1.创建一个消费者 "","hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅主题 first3
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("first3");
        kafkaConsumer.subscribe(topics);

        //3.消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据

            //循环打印消费的数据 consumerRecords.for
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            //手动提交offset
            kafkaConsumer.commitSync();     //同步提交
//            kafkaConsumer.commitAsync();    //异步提交

        }
    }
}

四、指定offset位置消费

问题引入:当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

可以通过设置offset的消费位置,进行开始消费

auto.offset.reset = earliest | latest | none 默认是 latest

  • earliest:自动将偏移量重置为最早的偏移量
  • latest(默认值):自动将偏移量重置为最新偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

API中通过下面参数进行配置

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

还有一种是在任意指定 offset 位移开始消费

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

/**
 * @author wangbo
 * @version 1.0
 */

/**
 * 1. 指定offset位置进行消费
 */

public class CustomConsumer_05 {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();

        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //配置消费者组ID 可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1.创建一个消费者 "","hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅主题 first3
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("first3");
        kafkaConsumer.subscribe(topics);

        //指定位置进行消费
        //获取分区信息,返回一个分区集合
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //保证分区的分配方案指定完毕
        while (assignment.size() == 0){ //说明没有分区分配方案
            kafkaConsumer.poll(Duration.ofSeconds(1));  //通过拉去数据,来获取分区分配方案

            //获取分区信息,返回一个分区集合,相当于更新一下
            assignment = kafkaConsumer.assignment();
        }

        //遍历分区集合,拿到所有的分区信息,指定消费的offset
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition,100); //指定offset为100,从100的位置进行消费数据
        }


        //3.消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据

            //循环打印消费的数据 consumerRecords.for
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

五、指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * @author wangbo
 * @version 1.0
 */

/**
 * 1. 指定时间消费,把时间转换为对应的offset
 */

public class CustomConsumer_06 {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();

        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //配置消费者组ID 可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1.创建一个消费者 "","hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅主题 first3
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("first3");
        kafkaConsumer.subscribe(topics);

        //==============================================================================================================
        //指定位置进行消费
        //获取分区信息,返回一个分区集合
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //保证分区的分配方案指定完毕
        while (assignment.size() == 0){ //说明没有分区分配方案
            kafkaConsumer.poll(Duration.ofSeconds(1));  //通过拉去数据,来获取分区分配方案

            //获取分区信息,返回一个分区集合,相当于更新一下
            assignment = kafkaConsumer.assignment();
        }

        //--------------------------------------------------------------------------------------------------------------

        //把时间转换为对应的offset
        //Key:TopicPartition主题分区  value:对应的时间
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

        //封装对应的集合,对集合中添加数据
        for (TopicPartition topicPartition : assignment) {
            topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1*24*3600*1000);  //当前时间-1天的时间 = 一天前的时间
        }

        //需要传入一个Map集合
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

        //--------------------------------------------------------------------------------------------------------------

        //遍历分区集合,拿到所有的分区信息,指定消费的offset
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);//获取集合中分区对应的value值,下面通过offset()方法进行转换

            kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset()); //指定offset为100,从100的位置进行消费数据
        }

        //==============================================================================================================

        //3.消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据

            //循环打印消费的数据 consumerRecords.for
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

六、消费者事物

问题引入:

  1. 重复消费:已经消费了数据,但是 offset 没提交
  2. 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费

为了解决以上问题,保证数据的精确一次性消费,需要使用消费者事物的方式进行处理

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定,跟生产者事物类似 p57文章来源地址https://www.toymoban.com/news/detail-745182.html

七、数据积压(提高吞吐量)

  1. 如果是Kafka消费能力不足,则可以考虑增 加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
  2. 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压

到了这里,关于kafka—offset偏移量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka—offset偏移量

    offset定义 :消费者再消费的过程中通过offset来记录消费数据的具体位置 offset存放的位置 :从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic(系统主题)中,名为__consumer_offsets,即offset维护在系统主题中 说明:__consumer_offsets 主题里面采用 key 和 value 的方式存储数

    2024年02月05日
    浏览(58)
  • 分布式服务框架_Zookeeper--管理分布式环境中的数据

    安装和配置详解 本文介绍的 Zookeeper 是以 3.2.2 这个稳定版本为基础,最新的版本可以通过官网   http://hadoop.apache.org/zookeeper/ 来获取, Zookeeper 的安装非常简单,下面将从单机模式和集群模式两个方面介绍 Zookeeper 的安装和配置。 单机模式

    2024年02月12日
    浏览(31)
  • 分布式系统中的数据复制

    本文翻译自国外论坛 medium,原文地址:https://medium.com/@interviewready/data-replication-in-distributed-system-87f7d265ff28 数据复制是指将数据复制到一个或多个数据容器以确保可用性的过程。复制的数据通常存储在不同的数据库实例中,即使一个实例发生故障,我们也可以从其他实例获取数

    2024年02月16日
    浏览(34)
  • 分布式深度学习中的数据并行和模型并行

    🎀个人主页: https://zhangxiaoshu.blog.csdn.net 📢欢迎大家:关注🔍+点赞👍+评论📝+收藏⭐️,如有错误敬请指正! 💕未来很长,值得我们全力奔赴更美好的生活! 对于深度学习模型的预训练阶段,海量的训练数据、超大规模的模型给深度学习带来了日益严峻的挑战,因此,经

    2024年01月24日
    浏览(32)
  • 多云中的数据安全:如何保护敏感数据在分布式环境中的安全

    随着云计算技术的发展,多云已经成为企业和组织的主流选择。多云可以为企业提供更高的灵活性、可扩展性和竞争力。然而,多云环境也带来了新的挑战,尤其是在数据安全方面。在多云中,敏感数据的分布和管理变得更加复杂,数据安全的保障也更加重要。因此,保护敏

    2024年02月21日
    浏览(30)
  • 数据流处理中的分布式存储:保护数据隐私和安全

    作者:禅与计算机程序设计艺术 随着数据量的爆炸式增长,如何高效地处理和存储数据成为了当前热门的研究方向。数据流处理作为一种处理数据的方法,能够在实时性、流式性和可扩展性等方面提供优势。在数据流处理中,分布式存储是保障数据隐私和安全的重要手段。本

    2024年02月16日
    浏览(26)
  • 大数据中的分布式文件系统MapReduce的选择题

    一. 单选题(共9题,49.5分) (单选题)下列传统并行计算框架,说法错误的是哪一项? A. 刀片服务器、高速网、SAN,价格贵,扩展性差上 B. 共享式(共享内存/共享存储),容错性好 C. 编程难度高 D. 实时、细粒度计算、计算密集型 正确答案: B:共享式(共享内存/共享存储),容错性好; 5.5分

    2024年02月04日
    浏览(29)
  • 分布式天梯图算法在 Redis 图数据库中的应用

    Redis是一个高性能的键值对数据库,支持常用的数据结构和分布式操作,被广泛应用于缓存、消息队列和排行榜等场景。除了基本的数据结构,Redis还支持图数据结构并提供了一些算法支持。 天梯图算法是一种基于贪心的图搜索算法,在寻找最短路径问题中具有很高的效率。

    2024年02月14日
    浏览(25)
  • 分布式消息服务kafka

    什么是消息中间件? 消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。 例如:在淘宝购物时,订单系统处理完订单后,把订单消息发送到消息中间件中,由消息中间件将订单消息分发到下

    2024年02月01日
    浏览(35)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包