Kafka:指定时间消费

这篇具有很好参考价值的文章主要介绍了Kafka:指定时间消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求:

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?文章来源地址https://www.toymoban.com/news/detail-503934.html

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class CustomConsumerSeekTime {

    public static void main(String[] args) {

        // 0 配置信息
        Properties properties = new Properties();

        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");

        // 1 创建消费者
        // KafkaConsumer<K, V> 
        // 由于消息形式是 key value 为 "", "hello"
        // 所以泛型K为key为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("test");
        kafkaConsumer.subscribe(topics);

        // ===================================此处==============================================

        // 指定位置进行消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //  保证分区分配方案已经制定完毕
        // 因为消费者初始化与broker通信进行分区分配需要一定的时间
        while (assignment.size() == 0){
            kafkaConsumer.poll(Duration.ofSeconds(1));

            assignment = kafkaConsumer.assignment();
        }

        // 希望把时间转换为对应的offset
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

        // 封装对应集合
        for (TopicPartition topicPartition : assignment) {
            // 每个分区对应一天前的数据,从此刻一天前
            topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }

        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

        // 遍历每个分区,对每个分区设置消费时间。指定消费的offset
        for (TopicPartition topicPartition : assignment) {

            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            
            if (offsetAndTimestamp != null) {
                // 将时间转成offset去消费
                kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
            }

        }

        // ==============================止此======================================

        // 3  消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                System.out.println(consumerRecord);
            }
        }
    }
}

到了这里,关于Kafka:指定时间消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(45)
  • 高效协作处理缓存清理需求:生产者-消费者模式助力多模块缓存管理

    在现代应用系统中,缓存是提高性能和减少数据库负载的重要手段之一。然而,缓存的数据在某些情况下可能会过期或者变得无效,因此需要及时进行清理。在复杂的应用系统中,可能有多个系统、多个模块产生缓存清理需求,而这些系统、模块之间的清理任务需要高效的协

    2024年02月15日
    浏览(45)
  • Kafka生产消费流程

    准备工作 创建maven工程,引入依赖 消费者 1.1 发送并忘记 忽略send方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息。 消费者 测试结果 1.2同步发送 测试结果 1.3 异步发送 测试结果 Kafka里消费者从属于消费者群组,一

    2024年01月16日
    浏览(36)
  • Kafka生产与消费示例

    Kafka是一款消息中间件,消息中间件本质就是收消息与发消息,所以这节课我们会从一条消息开始生产出发,去了解生产端的运行流程,然后简单的了解一下broker的存储流程,最后这条消息是如何被消费者消费掉的。其中最核心的有以下内容。 1、Kafka客户端是如何去设计一个

    2024年02月09日
    浏览(53)
  • 二、Kafka生产与消费全流程

    Kafka是一款消息中间件,消息中间件本质就是收消息与发消息,所以这节课我们会从一条消息开始生产出发,去了解生产端的运行流程,然后简单的了解一下broker的存储流程,最后这条消息是如何被消费者消费掉的。其中最核心的有以下内容。 1、Kafka客户端是如何去设计一个

    2024年02月09日
    浏览(52)
  • Java整合Kafka实现生产及消费

    搭建Kafka环境,参考Kafka集群环境搭建及使用 Java环境:JDK1.8 Maven版本:apache-maven-3.6.3 开发工具:IntelliJ IDEA 创建maven项目。 pom.xml文件中引入kafka依赖。 创建topic命名为testtopic并指定2个分区。 更多配置信息查看ProducerConfig类 创建分区策略类,实现org.apache.kafka.clients.producer.Par

    2024年02月04日
    浏览(49)
  • kafka生产者消费者练习

    需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    浏览(46)
  • Kafka系列之:Kafka生产者和消费者

    batch.size:只有数据积累到batch.size之后,sender才会发送数据,默认16K。 linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。 0:生产者发送过来的数据,不需要等数据罗盘应答。 1:生产者发送过来的

    2023年04月09日
    浏览(46)
  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(35)
  • SpringBoot整合Kafka简单配置实现生产消费

    *本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考 spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html 搭建Kafka环境,参考Kafka集群环境搭建及使用 Java环境:JDK1.8 Maven版本:apach

    2024年02月16日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包