JAVA面试题分享一百六十三:Kafka如何实现延时推送?

这篇具有很好参考价值的文章主要介绍了JAVA面试题分享一百六十三:Kafka如何实现延时推送?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、延时队列定义

二、技术实现方案

1. Redis

2. Kafka

3. RabbitMQ

4. RocketMQ

三、Kafka延时队列背景

四、Kafka延时队列实现思路

六、Kafka延时队列架构图

七、kafka延时任务代码实现

1. KafkaDelayQueue:Kafka延迟队列

2. KafkaDelayQueueFactory:Kafka延迟队列工厂

3. KafkaPollListener:Kafka延迟队列事件监听

4. KafkaDelayConfig:Kafka延时配置

八. 如何使用kafka延时队列

九、测试

十、总结


一、延时队列定义

延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。

二、技术实现方案

1. Redis

1.1 优点:
①Redis的延迟队列是基于Redis的sorted set实现的,性能较高。
②Redis的延迟队列可以通过TTL设置过期时间,灵活性较高。
③简单易用,适用于小型系统。
④性能较高,支持高并发。

1.2 缺点:
①可靠性相对较低,可能会丢失消息,就算redis最高级别的持久化也是有可能丢一条的,每次请求都做aof,但是aof是异步的,所以不保证这一条操作能被持久化。
②而且Redis持久化的特性也导致其在数据量较大时,存储和查询效率逐渐降低,此时会需要对其进行分片和负载均衡。
③Redis的延迟队列需要手动实现消息重试机制,更严谨的消息队列需要数据库兜底。

1.3 应用场景:
①适用于较小规模的系统,实时性要求较高的场景。
②适用于轻量级的任务调度和消息通知场景,适合短期延迟任务,不适合长期任务,例如订单超时未支付等。

2. Kafka

2.1 优点:
①Kafka的优点在于其高并发、高吞吐量和可扩展性强,同时支持分片。
②可靠性高,支持分布式和消息持久化。
③消费者可以随时回溯消费。
④支持多个消费者并行消费、消费者组等机制。

2.2 缺点:
①没有原生的延迟队列功能,需要使用topic和消费者组来实现,实现延迟队列需要额外的开发工作。
②消费者需要主动拉取数据,可能会导致延迟,精度不是特别高。
在此案例中代码已经实现了,直接拿来使用就可以了。

2.3 应用场景:
适用于大规模的数据处理,实时性要求较高的,高吞吐量的消息处理场景。

3. RabbitMQ

3.1 优点:
①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。
②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③支持消息持久化和分布式。
④支持优先级队列和死信队列。
⑤提供了丰富的插件和工具。

3.2 缺点:
①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。
②性能较低,不适合高并发场景。
③实现延迟队列需要额外的配置,但是配置就很简单了。

3.3应用场景:
适用于中小型的任务调度和消息通知,对可靠性要求高的场景。

4. RocketMQ

4.1 优点:
①RocketMQ的延迟队列是RocketMQ原生支持的,易于使用和部署。
②RocketMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③高性能和高吞吐量,支持分布式和消息持久化。
④RocketMQ使用简单,性能好,并且支持延迟队列功能。

4.2 缺点:
①RocketMQ的延迟队列不支持动态添加或删除队列。
②RocketMQ的延迟队列需要保证消息的顺序,可能会导致消息延迟。
③在节点崩溃后,RocketMQ有可能发生消息丢失。

4.3 应用场景:
①适用于大规模的数据处理,对性能和吞吐量要求较高的场景。
②适合于任务量较大、需要延迟消息和定时消息的场景。例如电商平台、社交软件等。
③适用于分布式任务调度和高可靠性消息通知场景。

三、Kafka延时队列背景

  1. 基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。

  2. 网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。

  3. Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现

四、Kafka延时队列实现思路

  1. 解决一个问题前首先要明确问题,如何让Kafka有延时队列的功能呢?
  2. 就是在Kafka消费者消费的时候延时消费,不久搞定了嘛
  3. 那如何延时消费呢,网上有些文章使用Thread.sleep进行延时消费这是不靠谱的(亲身实践),sleep的时间超过了Kafka配置的max.poll.records时间,消费者无法及时提交offset,kafka就会认为这个消费者已经挂了,会进行rebalance也就是重新分配分区给消费者,以保证每个分区只被一个消费者消费
  4. 也有同学说了,为了不发生rebalance,那可以增加max.poll.records时间啊,但是这样的话,如果要sleep几天的时间,难道max.poll.records要写几天的时间嘛,有违Kafka的设计原理了,那怎么办呢?
  5. 这时候Kafka的pause暂停消费和resume恢复消费就登场了,pause暂停某个分区之后消费者不会再poll拉取该分区的消息,直到resume恢复该分区之后才会重新poll消息。
  6. 我已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现延时队列了,代码核心使用Kafka消费者的pause函数(暂停)和resume函数(恢复)+线程池+定时任务+事件监听机制+工厂模式

六、Kafka延时队列架构图

JAVA面试题分享一百六十三:Kafka如何实现延时推送?,JAVA 面试题分享,MQ(Message Queue)消息队列 内容分享,Kafka 面试题分享,java,kafka,开发语言

七、kafka延时任务代码实现

以下代码只列出了核心实现

JAVA面试题分享一百六十三:Kafka如何实现延时推送?,JAVA 面试题分享,MQ(Message Queue)消息队列 内容分享,Kafka 面试题分享,java,kafka,开发语言

1. KafkaDelayQueue:Kafka延迟队列

定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等

package com.wdyin.kafka.delay;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * kafka延时队列
 *
 **/
@Slf4j
@Getter
@Setter
class KafkaDelayQueue<K, V> {

    private String topic;
    private String group;
    private Integer delayTime;
    private String targetTopic;
    private KafkaDelayConfig kafkaDelayConfig;
    private KafkaSyncConsumer<K, V> kafkaSyncConsumer;
    private ApplicationContext applicationContext;
    private ThreadPoolTaskScheduler threadPoolPollTaskScheduler;
    private ThreadPoolTaskScheduler threadPoolDelayTaskScheduler;
    ......
}


2. KafkaDelayQueueFactory:Kafka延迟队列工厂

Kafka延期队列的工厂,用于及其管理延迟队列文章来源地址https://www.toymoban.com/news/detail-760309.html

package com.wdyin.kafka.delay;

import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.Properties;

/**
 * 延时队列工厂
 **/
@Data
public class KafkaDelayQueueFactory {

    private KafkaDelayConfig kafkaDelayConfig;
    private Properties properties;
    private ApplicationContext applicationContext;
    private Integer concurrency;

    public KafkaDelayQueueFactory(Properties properties, KafkaDelayConfig kafkaDelayConfig) {
        Assert.notNull(properties, "properties cannot null");
        Assert.notNull(kafkaDelayConfig.getDelayThreadPool(), "delayThreadPool cannot null");
        Assert.notNull(kafkaDelayConfig.getPollThreadPool(), "pollThreadPool cannot null");
        Assert.notNull(kafkaDelayConfig.getPollInterval(), "pollInterval cannot null");
        Assert.notNull(kafkaDelayConfig.getPollTimeout(), "timeout cannot null");
        this.properties = properties;
        this.kafkaDelayConfig = kafkaDelayConfig;
    }

    public void listener(String topic, String group, Integer delayTime, String targetTopic) {
        if (StringUtils.isEmpty(topic)) {
            throw new RuntimeException("topic cannot empty");
        }
        if (StringUtils.isEmpty(group)) {
            throw new RuntimeException("group cannot empty");
        }
        if (StringUtils.isEmpty(delayTime)) {
            throw new RuntimeException("delayTime cannot empty");
        }
        if (StringUtils.isEmpty(targetTopic)) {
            throw new RuntimeException("targetTopic cannot empty");
        }
        KafkaSyncConsumer<String, String> kafkaSyncConsumer = createKafkaSyncConsumer(group);
        KafkaDelayQueue<String, String> kafkaDelayQueue = createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer);
        kafkaDelayQueue.send();
    }

    private KafkaDelayQueue<String, String> createKafkaDelayQueue(String topic, String group, Integer delayTime, String targetTopic, KafkaSyncConsumer<String, String> kafkaSyncConsumer) {
        KafkaDelayQueue<String, String> kafkaDelayQueue = new KafkaDelayQueue<>(kafkaSyncConsumer, kafkaDelayConfig);
        Assert.notNull(applicationContext, "kafkaDelayQueue need applicationContext");
        kafkaDelayQueue.setApplicationContext(applicationContext);
        kafkaDelayQueue.setDelayTime(delayTime);
        kafkaDelayQueue.setTopic(topic);
        kafkaDelayQueue.setGroup(group);
        kafkaDelayQueue.setTargetTopic(targetTopic);
        return kafkaDelayQueue;
    }

    private KafkaSyncConsumer<String, String> createKafkaSyncConsumer(String group) {
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        return new KafkaSyncConsumer<>(properties);
    }

}

3. KafkaPollListener:Kafka延迟队列事件监听

package com.wdyin.kafka.delay;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.KafkaTemplate;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;

/**
 * 延时队列监听
 * @Desc :
 */
@Slf4j
public class KafkaPollListener<K, V> implements ApplicationListener<KafkaPollEvent<K, V>> {

    private KafkaTemplate kafkaTemplate;

    public KafkaPollListener(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void onApplicationEvent(KafkaPollEvent<K, V> event) {
        ConsumerRecords<K, V> records = (ConsumerRecords<K, V>) event.getSource();
        Integer delayTime = event.getDelayTime();
        KafkaDelayQueue<K, V> kafkaDelayQueue = event.getKafkaDelayQueue();
        KafkaSyncConsumer<K, V> kafkaSyncConsumer = kafkaDelayQueue.getKafkaSyncConsumer();
        Set<TopicPartition> partitions = records.partitions();
        Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>();
        partitions.forEach((partition) -> {
            List<ConsumerRecord<K, V>> consumerRecords = records.records(partition);
            for (ConsumerRecord<K, V> record : consumerRecords) {
                long startTime = (record.timestamp() / 1000) * 1000;
                long endTime = startTime + delayTime;
                long now = System.currentTimeMillis();
                if (endTime > now) {
                    kafkaSyncConsumer.pauseAndSeek(partition, record.offset());
                    kafkaDelayQueue.getThreadPoolPollTaskScheduler().schedule(kafkaDelayQueue.delayTask(partition), new Date(endTime));
                    break;
                }
                log.info("{}: partition:{}, offset:{}, key:{}, value:{}, messageDate:{}, nowDate:{}, messageDate:{}, nowDate:{}",
                        Thread.currentThread().getName() + "#" + Thread.currentThread().getId(), record.topic() + "-" + record.partition(), record.offset(), record.key(), record.value(), LocalDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()), LocalDateTime.now(), startTime, Instant.now().getEpochSecond());
                kafkaTemplate.send(kafkaDelayQueue.getTargetTopic(), record.value());
                commitMap.put(partition, new OffsetAndMetadata(record.offset() + 1));
            }
        });
        if (!commitMap.isEmpty()) {
            kafkaSyncConsumer.commit(commitMap);
        }
    }
}

4. KafkaDelayConfig:Kafka延时配置

package com.wdyin.kafka.delay;

import lombok.Data;

/**
 * 延时队列配置
 **/
@Data
public class KafkaDelayConfig {

    private Integer pollInterval;
    private Integer pollTimeout;
    private Integer pollThreadPool;
    private Integer delayThreadPool;

    public KafkaDelayConfig() {
    }
    ......
}


八. 如何使用kafka延时队列

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 **/
@Component
public class KafkaDelayApplication {

    @Resource
    private KafkaDelayQueueFactory kafkaDelayQueueFactory;

	/**
     * 延迟任务都可以配置在这里
     * Kafka将消息从【延时主题】经过【延时时间】后发送到【目标主题】
     */
    @PostConstruct
    public void init() {
        //延迟30秒
        kafkaDelayQueueFactory.listener("delay-30-second-topic", "delay-30-second-group", 1 * 30 * 1000, "delay-60-second-target-topic");
        //延迟60秒
        kafkaDelayQueueFactory.listener("delay-60-second-topic", "delay-60-second-group", 1 * 60 * 1000, "delay-60-second-target-topic");
        //延迟30分钟
        kafkaDelayQueueFactory.listener("delay-30-minute-topic", "delay-30-minute-group", 30 * 60 * 1000, "delay-30-minute-target-topic");
    }
}

九、测试

  1. 先往延时主题【delay-60-second-topic】发送一千条消息,一共10个分区,每个分区100条消息,消息时间是2023-04-21 16:37:26分,延迟消息消费时间就应该是2023-04-21 16:38:26
    JAVA面试题分享一百六十三:Kafka如何实现延时推送?,JAVA 面试题分享,MQ(Message Queue)消息队列 内容分享,Kafka 面试题分享,java,kafka,开发语言
  2. 延时队列进行消费:通过日志查看,消息日期和延迟队列消费消息时间正好相差一分钟
    JAVA面试题分享一百六十三:Kafka如何实现延时推送?,JAVA 面试题分享,MQ(Message Queue)消息队列 内容分享,Kafka 面试题分享,java,kafka,开发语言

十、总结

  1. 本案例已成功实现Kafka的延时队列,并进行实测,代码引入可用非常方便。
  2. Kafka实现的延时队列支持秒级别的延时任务,不支持毫秒级别,但是毫秒级别的延时任务也没有意义
  3. 注意一个主题对应的延时时间是一致的,不能在同一个主题里放不同时间的延时任务。
  4. 此方案的缺点就是,如果数据量极大,生产者生产消息速度很快,一定要保证Kafka的消费能力,否则可能会导致延迟,精度不是特别高,不过如果延迟秒级的任务,差个几毫秒肯定可以接受的,一般场景肯定满足。

到了这里,关于JAVA面试题分享一百六十三:Kafka如何实现延时推送?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一百六十八、Kettle——用海豚调度器定时调度从Kafka到HDFS的kettle任务脚本(持续更新追踪、持续完善)

    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件? 为了测试实际项目中的海豚定时调度从Kafka到HDFS的kettle任务情况,特地提前跑一下海豚定时调度这个

    2024年02月09日
    浏览(31)
  • 每天一个数据分析题(一百六十)

    以下关于代码片段(使用sklearn)的使用和PCA(主成分分析)的描述中,哪项是正确的? A. preprocessing.scale(data)用于对数据进行归一化处理,确保PCA分析前各特征处于同一量级。 B. PCA(n_components=9)将数据降维了9个主成分。 C. pca.explained_variance_输出的是降维后各主成分的方差。

    2024年02月20日
    浏览(38)
  • 每天一个数据分析题(一百六十四)

    关于OLAP系统,下列选项不正确的是() A. 是基于数据仓库的信息进行分析处理过程 B. 用户数量相对较少,其用户主要是业务决策人员与管理人员 C. 对响应时间要求非常高。 D. 基础数据来源于生产系统的操作数据,也就是说,OLAP系统的数据来源与OLTP系统。 题目来源于CDA模

    2024年02月22日
    浏览(40)
  • PCL点云处理之多种体素滤波方法大汇总(一百六十四)

    对PCL中的基于八叉树体素滤波方法,以及在此基础上,自己进一步实现的新滤波方法,进行一个汇总,列出各自的效果和,具体的实现代码 PCL中自带的滤波方法,也是最常用的滤波方法,应该是体素中的点云重心取代原始点,但使用时要注意体素不可过小,

    2024年02月05日
    浏览(34)
  • 一百六十九、Hadoop——Hadoop退出NameNode安全模式与查看磁盘空间详情(踩坑,附截图)

    在海豚跑定时跑kettle的从Kafka到HDFS的任务时,由于Linux服务器的某个文件磁盘空间满了,导致Hadoop的NodeName进入安全模式,此时光执行 hdfs dfsadmin -safemode leave命令语句没有效果( 虽然显示Safe mode is OFF,但没效果,一旦执行还是报错 ) Caused by: org.apache.hadoop.ipc.RemoteException(org

    2024年02月10日
    浏览(31)
  • 一百六十、Kettle——Linux上安装的Kettle9.2.0连接Hive3.1.2

    Kettle9.2.0在Linux上安装好后,需要与Hive3.1.2数据库建立连接 之前已经在本地上用kettle9.2.0连上Hive3.1.2 kettle9.2.0安装包网盘链接 链接:https://pan.baidu.com/s/15Zq9wNDwyMnc3qFVxYOMXw?pwd=zwae  提取码:zwae 1、Hive312的lib里面MySQL驱动包的版本是mysql-connector-java-5.1.37.jar 2、Kettle9.2里MySQL驱动包的

    2024年02月12日
    浏览(55)
  • 一百六十五、Kettle——用海豚调度器调度Linux资源库中的kettle任务脚本(亲测、附流程截图)

    在Linux上脚本运行kettle的转换任务、无论是Linux本地还是Linux资源库都成功后,接下来就是用海豚调度Linux上kettle任务 尤其是团队开发中,基本都要使用共享资源库,所以我直接使用海豚调度Linux资源库的kettle任务脚本 1、先开启zookeeper服务 2、再开启海豚调度器服务 3、开启服

    2024年02月11日
    浏览(40)
  • 架构设计内容分享(一百三十三):ES+Redis+MySQL高可用,如何试实现?

    目录 背景: ES 高可用方案: ES 双中心主备集群架构 ES 流量隔离三集群架构 ES 集群深度优化提升 会员 Redis 缓存方案: ES 近一秒延时导致的 Redis 缓存数据不一致问题的解决方案 Redis 双中心多集群架构 高可用会员主库方案: MySQL 双中心 Partition 集群方案 会员主库平滑迁移方

    2024年02月22日
    浏览(38)
  • 第二百六十九回

    我们在上一章回中介绍了Card Widget相关的内容,本章回中将介绍国际化设置.闲话休提,让我们一起Talk Flutter吧。 我们在这里说的国际化设置是指在App设置相关操作,这样可以让不同国家的用户使用App时呈现不同的语言。总之,就是通过相关的操作,让App支持多个国家的语言。

    2024年01月18日
    浏览(33)
  • 第二百六十四回

    我们在上一章回中介绍了SliverPadding组件相关的内容,本章回中将介绍Sliver综合示例.闲话休提,让我们一起Talk Flutter吧。 我们在前面的章回中介绍了各种Sliver相关的组件:SliverList,SliverGrid,SliverAppBar和SliverPadding,本章回将综合使用它们。下面是示例程序的 运行效果图。不过

    2024年01月18日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包