Kafka根据时间戳消费数据并返回最近的十条

这篇具有很好参考价值的文章主要介绍了Kafka根据时间戳消费数据并返回最近的十条。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求:

根据时间戳消费kafka topic数据并返回最近十条

思路:

通过时间戳获取各个分区的起始offset,每个分区都取10条,最后将所有分区的十条数据进行升序排序后,返回前十条

踩坑:

第一次写的时候,调用consumer.assign时,传参是所有的分区,consumer.seek也是设置的所有分区的起始offset,导致poll的时候,一直在拉一个分区的数据

解决:

在同事的帮助下,每assign seek poll完一个分区需要的数据再继续下一个文章来源地址https://www.toymoban.com/news/detail-508140.html

代码:

        public List<String> getTopicRecordWithTimestamp(String topic, Long startTimestamp, Integer size) {
        long t0 = System.currentTimeMillis();
        if (size == 0) {
            size = 20;
        }
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, url);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "preview_topic_with_timestamp");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STR_STRING_DESERIALIZER);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STR_STRING_DESERIALIZER);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, size);
        List<String> kafkaRecords = Lists.newArrayList();
        long t1 = System.currentTimeMillis();
        try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            long t2 = System.currentTimeMillis();
            // 设置各个分区起始消费offset
            List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
            for(PartitionInfo partitionInfo : partitionInfoList) {
                TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                // assign partition
                consumer.assign(Collections.singleton(topicPartition));
                long offsetForTimestamp;
                Map<TopicPartition, Long> timestampToSearch = Collections.singletonMap(topicPartition, startTimestamp);
                Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(timestampToSearch);
                OffsetAndTimestamp result = offsetAndTimestamp.get(topicPartition);
                if (result == null) {
                    offsetForTimestamp = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition);
                } else {
                    offsetForTimestamp =  result.offset();
                }
                consumer.seek(topicPartition, offsetForTimestamp);
                // 若2s内无20条数据即可以返回
                long start = System.currentTimeMillis();
                long waitTime = 0L;
                long allRecordsCount = 0L;
                while (waitTime < Constant.NUM_3000 && allRecordsCount < size) {
                    waitTime = System.currentTimeMillis() - start;
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        int recordPartition = record.partition();
                        if (allRecordsCount < size) {
                            allRecordsCount ++;
                            kafkaRecords.add(record.value());
                        }else {
                            break;
                        }
                    }
                }
            }
            long t3 = System.currentTimeMillis();

            //按时间戳升序返回
            Collections.sort(kafkaRecords, new Comparator<KafkaRecordInfo>() {
                @Override
                public int compare(KafkaRecordInfo o1, KafkaRecordInfo o2) {
                    return (int)(o1.getTimestamp() - o2.getTimestamp());
                }
            });
            if (kafkaRecords.size() > size) {
                kafkaRecords = kafkaRecords.subList(0, size);
            }
            long t4 = System.currentTimeMillis();
            log.info("total = {}, prepare = {}, create = {}, seek and poll = {}, sort = {}", t4 - t0, t1 - t0, t2 - t1,
                    t3 - t2, t4 - t3);
        } catch (Exception e) {
            log.warn("read message from kafka topic [{}] failed", topic, e);
        }
        return kafkaRecords;
    }

到了这里,关于Kafka根据时间戳消费数据并返回最近的十条的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 消息日志原理 & 指定偏移量消费 & 指定时间戳消费

    Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。 如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附

    2024年02月15日
    浏览(46)
  • kafka:消费者从指定时间的偏移开始消费(二)

    我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。 但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取

    2024年02月15日
    浏览(41)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(52)
  • Kafka消费者常用超时时间配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超时时间(session超时时间)增加成25秒(之前项目设置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息减少为20(之前是默认值500) spring.kafka.consumer.max-poll-records=20 //消息消费超时时间增加为10分钟 spring.kafka.p

    2024年02月03日
    浏览(93)
  • 使用时间戳来消费消息(kafka)

    每条消息都有一个与之相关的 时间戳(timestamp ),可以使用这个时间戳来筛选或消费特定时间范围内的消息。 timestamp() 方法获取消息的时间戳,并检查它是否在指定的时间范围内。 请注意,时间戳是以毫秒为单位的UNIX时间戳。需要根据需要调整 start_timestamp 和 end_timestamp

    2024年01月24日
    浏览(45)
  • kafka消费数据,有时消费不到原因?

    Kafka消费数据时有时消费不到的原因可能包括以下几点: 1:配置问题:首先需要检查Kafka的配置是否正确,比如是否设置了group.id ,对应的topic是否正确等。如果消费者尝试消费不存在的主题,则会发生错误。 2:消费者群组配置错误:如果消费者所属的消费群组配置错误,也

    2024年04月23日
    浏览(27)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(46)
  • Kafka消费者无法消费数据,解决

    作为一个在项目中边学边用的实习生,真的被昨天还好好的今天就不能消费数据的kafka折磨到了,下面提供一点建议,希望能对大家有所帮助。 //操作前集群都关了 1.首先去kafka-home的config目录下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092    如果有配置liste

    2024年02月17日
    浏览(52)
  • java根据前端所要格式返回树形3级层级数据

    一、业务分析,根据前端需求返回如下数据格式   二、后端设计数据类型VO 三、代码实现 1.编写Controller 2.编写Service 3、结果展示            

    2024年02月19日
    浏览(43)
  • Kafka消费不到数据

    1.首先停止kafka: kafka.sh stop 2.查看 /config/server. properties看logdir的目录,换个目录或者删除目录中的数据,本机是 rm -rf datas/*   ( 注 :kafka报错日志可以在 logs里 server.log查看) 两种方法: 方法1:登录bin/zkCli.sh 然后删除kafka中/brokers/topics/__consumer_offsets 节点 方法2:停止zookeepe

    2024年02月16日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包