延时消息队列

这篇具有很好参考价值的文章主要介绍了延时消息队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言

一、延时队列实用场景

二、DelayQueue 

DelayQueue的实现

使用延迟队列 

 DelayQueue实现延时任务的优缺点

三、RocketMQ

原理

四、Kafka

原理

实现 

DelayMessage定义

消息发送代码 

消费者代码 

参考


前言

延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列就是用来存放需要在指定时间点被处理的元素的队列

队列是存储消息的载体,延时队列存储的对象是延时消息。所谓的延时消息,是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。


一、延时队列实用场景

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将货款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能;

  • 订单在三十分钟之内未支付则自动取消;

  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;

  • 用户注册成功后,如果三天内没有登陆则进行短信提醒

  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员;

  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

二、DelayQueue 

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
}

DelayQueue是一个无界的BlockingQueue,是线程安全的(无界指的是队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容,阻塞队列指的是当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常)

以上是阻塞队列的特点,而延迟队列还拥有自己如下的特点:

DelayQueue中存入的必须是实现了Delayed接口的对象(Delayed定义了一个getDelay的方法,用来判断排序后的元素是否可以从Queue中取出,并且Delayed接口还继承了Comparable用于排序),插入Queue中的数据根据compareTo方法进行排序(DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable接口的compareTo方法),并通过getDelay方法返回的时间确定元素是否可以出队,只有小于等于0的元素(即延迟到期的元素)才能够被取出

延迟队列不接收null元素

DelayQueue的实现

public class UserDelayTask implements Delayed {

    @Getter
    private UserRegisterMessage message;

    private long delayTime;

    public UserDelayTask(UserRegisterMessage message, long delayTime) {
        this.message = message;
        // 延迟时间加当前时间
        this.delayTime = System.currentTimeMillis() + delayTime;
    }

    // 获取任务剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(delayTime, ((UserDelayTask) o).delayTime);
    }
}

定义延迟队列并交付容器管理 

    /**
     * 延迟队列
     */
    @Bean("userDelayQueue")
    public DelayQueue<UserDelayTask> orderDelayQueue() {
        return new DelayQueue<UserDelayTask>();
    }

使用延迟队列 

@Resource
private DelayQueue<UserDelayTask> orderDelayQueue;
 
UserDelayTask task = new UserDelayTask(message, 1000 * 60);
        orderDelayQueue.add(task);

开启线程处理延迟任务

 @Override
    public void afterPropertiesSet() throws Exception {
        new Thread(() -> {
            while (true) {
                try {
                    UserDelayTask task = orderDelayQueue.take();
                    // 当队列为null的时候,poll()方法会直接返回null, 不会抛出异常,但是take()方法会一直等待,
                    // 因此会抛出一个InterruptedException类型的异常。(当阻塞方法收到中断请求的时候就会抛出InterruptedException异常)
                    UserRegisterMessage message = task.getMessage();
                    execute(message);

                    // 执行业务
                } catch (Exception ex) {
                    log.error("afterPropertiesSet", ex);
                }
            }
        }).start();
    }

 DelayQueue实现延时任务的优缺点

使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。
 

三、RocketMQ

RocketMQ 和本身就有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本的可以)。

他的默认时间间隔分为 18 个级别,基本上也能满足大部分场景的需要了。

默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

使用起来也非常的简单,直接通过setDelayTimeLevel设置延迟级别即可。

setDelayTimeLevel(level)

原理

实现原理说起来比较简单,Broker 会根据不同的延迟级别创建出多个不同级别的队列,当我们发送延迟消息的时候,根据不同的延迟级别发送到不同的队列中,同时在 Broker 内部通过一个定时器去轮询这些队列(RocketMQ 会为每个延迟级别分别创建一个定时任务),如果消息达到发送时间,那么就直接把消息发送到指 topic 队列中。

RocketMQ 这种实现方式是放在服务端去做的,同时有个好处就是相同延迟时间的消息是可以保证有序性的。

谈到这里就顺便提一下关于消息消费重试的原理,这个本质上来说其实是一样的,对于消费失败需要重试的消息实际上都会被丢到延迟队列的 topic 里,到期后再转发到真正的 topic 中。

延时消息队列,消息队列,分布式,mysql,大数据

四、Kafka

对于 Kafka 来说,原生并不支持延迟队列的功能,需要我们手动去实现,这里我根据 RocketMQ 的设计提供一个实现思路。

这个设计,我们也不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大部分延迟消息的场景来说足够使用了。

只创建一个 topic,但是针对该 topic 创建 18 个 partition,每个 partition 对应不同的延迟级别,这样做和 RocketMQ 一样有个好处就是能达到相同延迟时间的消息达到有序性。

原理

  • 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别
  • 发送消息的时候根据消息延迟等级发送到延迟 topic 对应的 partition,同时把原 topic 保存到 延迟消息 中。
  • 内嵌的consumer单独设置一个ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行pause,然后seek到当前ConsumerRecordoffset位置,同时使用定时器去轮询延迟的TopicPartition,达到延迟时间之后进行resume。

       KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。

  • 如果达到了延迟时间,那么就获取到延迟消息中的真实 topic ,直接转发

这里为什么要进行pauseresume呢?因为如果不这样的话,如果超时未消费达到max.poll.interval.ms 最大时间(默认300s),那么将会触发 Rebalance。

延时消息队列,消息队列,分布式,mysql,大数据

实现 

DelayMessage定义
/**
 * 延迟消息
 *
 * @author yangyanping
 * @date 2023-08-31
 */
@Getter
@Setter
@ToString
public class DelayMessage<T> implements DTO {
    /**
     * 消息级别,共18个,对应18个partition
     */
    private Integer level;

    /**
     * 业务类型,真实投递到的topic
     */
    private String topic;

    /**
     * 目标消息key
     */
    private String key;

    /**
     * 事件
     */
    private DomainEvent<T> event;
}
消息发送代码 
public void publishAsync(DelayMessage delayMessage) {
        String topic = "delay_topic";

        try {
            Integer level = delayMessage.getLevel();
            Integer delayPartition = level - 1;
            String data = JSON.toJSONString(delayMessage);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, delayPartition, "", data);

            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    //发送成功后回调
                    log.info("{}-异步发送成功, result={}。", topic, result.getRecordMetadata().toString());
                }

                @Override
                public void onFailure(Throwable throwable) {
                    //发送失败回调
                    log.error("{}-异步发送失败。", topic, throwable);
                }
            });
        } catch (Exception ex) {
            log.error("{}-异步发送异常。", topic, ex);
        }
    }
消费者代码 
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xinwu.shushan.exception.BizException;
import com.xinwu.shushan.launch.infra.publisher.KafkaPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 参考RocketMQ支持延迟消息设计,不支持任意时间精度的延迟消息,只支持特定级别的延迟消息,
 * 将消息延迟等级分为1s、5s、10s 、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,共18个级别,
 * 只创建一个有18个分区的延时topic,每个分区对应不同延时等级。
 * <p>
 * https://blog.csdn.net/weixin_40270946/article/details/121293032
 * <p>
 * https://zhuanlan.zhihu.com/p/365802989
 *
 * @author yangyanping
 * @date 2023-08-30
 */
@Slf4j
@Component
public class DelayConsumer implements ConsumerSeekAware {

    /**
     * 锁
     */
    private final Object lock = new Object();

    /**
     * 间隔
     */
    private final int interval = 5000;

    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    /**
     * 消费者
     */
    private volatile Consumer kafkaConsumer;

    @Resource
    private KafkaPublisher kafkaPublisher;

    @PostConstruct
    public void init() {
        //当系统需要循环间隔一定时间执行某项任务的时候可以使用scheduleWithFixedDelay方法来实现
        executorService.scheduleWithFixedDelay(() -> {
            synchronized (lock) {
                resume();
                lock.notifyAll();
                log.info("DelayConsumer-notifyAll");
            }
        }, 0, interval, TimeUnit.MILLISECONDS);
    }

    /**
     * 批量消费消息
     */
    @KafkaListener(topics = "#{'${shushan.launch.event.delayTopic.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}", concurrency = "1")
    public void onMessage(List<ConsumerRecord<String, String>> records, Consumer consumer) {
        synchronized (lock) {
            try {
                /**
                 * 7、ConcurrentModificationException
                 * java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
                 * 报错内容:线程不安全
                 *
                 * 原因分析:Kafka consumer是非线程安全的
                 */
                this.kafkaConsumer = consumer;

                if (CollectionUtil.isEmpty(records)) {
                    log.info("DelayConsumer-records is empty !");
                    consumer.commitSync();
                    return;
                }

                boolean delay = false;

                for (ConsumerRecord<String, String> record : records) {
                    long timestamp = record.timestamp();
                    String value = record.value();
                    JSONObject jsonObject = JSON.parseObject(value);
                    Integer level = Convert.toInt(jsonObject.get("level"));
                    String targetTopic = Convert.toStr(jsonObject.get("topic"));
                    String event = Convert.toStr(jsonObject.get("event"));
                    String msgKey = Convert.toStr(jsonObject.get("key"));

                    TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                    long delayTime = getDelayTime(timestamp, level);

                    if (delayTime <= System.currentTimeMillis()) {
                        log.info("DelayConsumer-delayTime={} <= currentTime={},msgKey={}", delayTime, System.currentTimeMillis(), msgKey);
                        // 处理消息
                        processMessage(record, consumer, topicPartition, targetTopic, msgKey, event);
                    } else {
                        log.info("DelayConsumer-delayTime={} > currentTime={},msgKey={},partition={}", delayTime, System.currentTimeMillis(), msgKey, topicPartition.partition());
                        // 暂停消费
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, record.offset());
                        delay = true;
                        break;
                    }
                }

                if (delay) {
                    lock.wait();
                }
            } catch (Exception var10) {
                log.error("DelayConsumer.onMessage error", var10);
                throw new BizException("事件消息消费失败", var10);
            }
        }
    }

    /**
     * 消息级别,共6个
     * level-1 :15s
     * level-2 : 30s
     * level-3 : 1m
     * level-4 : 5m
     * level-5 : 10m
     * level-6 : 30m
     */
    private Long getDelayTime(long timestamp, Integer level) {
        switch (level) {
            case 1:
                return timestamp + 25 * 1000;
            case 2:
                return timestamp + 30 * 1000;
            case 3:
                return timestamp + 1 * 60 * 1000;
            case 4:
                return timestamp + 5 * 60 * 1000;
            case 5:
                return timestamp + 10 * 60 * 1000;
            case 6:
                return timestamp + 30 * 60 * 1000;
        }

        return timestamp;
    }


    /**
     * 处理消息 并提交消息
     */
    private void processMessage(ConsumerRecord<String, String> record, Consumer consumer, TopicPartition topicPartition, String targetTopic, String msgKey, String event) {

        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
        HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
        metadataHashMap.put(topicPartition, offsetAndMetadata);

        //发布目标
        kafkaPublisher.publishSync(targetTopic, msgKey, event);

        log.info("DelayConsumer-records#offset={},targetTopic={},event={}", record.offset() + 1, targetTopic, event);
        consumer.commitSync(metadataHashMap);
    }

    /**
     * 重启消费
     */
    private void resume() {
        try {
            if (kafkaConsumer == null) {
                return;
            }

            Set<TopicPartition> topicPartitions = kafkaConsumer.paused();

            if (CollectionUtil.isEmpty(topicPartitions)) {
                return;
            }

            kafkaConsumer.resume(topicPartitions);
        } catch (Exception ex) {
            log.error("DelayConsumer-resume", ex);
        }
    }
}

参考

RabbitMQ、RocketMQ、Kafka延迟队列实现-腾讯云开发者社区-腾讯云

延迟消息队列设计-腾讯云开发者社区-腾讯云

用Kafka实现延迟消息_kafka延迟消费_alvin.yao的博客-CSDN博客

怎么设计一个合适的延时队列?

基于kafka实现延迟队列 - 知乎文章来源地址https://www.toymoban.com/news/detail-697328.html

到了这里,关于延时消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式消息队列RocketMQ概念详解

    目录 1.MQ概述 1.1 RocketMQ简介 1.2 MQ用途 1.3 常见MQ产品 2.RocketMQ 基本概念 2.1 消息 2.2 主题 2.3 标签 2.4 队列  2.5 Producer 2.6 Consumer 2.7 NameServer 2.8 Broker 2.9 RocketMQ 工作流程   RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息

    2024年02月03日
    浏览(65)
  • 架构核心技术之分布式消息队列

    Java全能学习+面试指南:https://javaxiaobear.cn 今天我们来学习分布式消息队列,分布式消息队列的知识结构如下图。 主要介绍以下内容: 同步架构和异步架构的区别。异步架构的主要组成部分:消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型:点对点模型

    2024年02月07日
    浏览(46)
  • Spring Boot如何实现分布式消息队列

    在分布式系统中,消息队列是非常重要的一部分,可以帮助开发人员实现异步处理、解耦系统、提高系统可靠性等。本文将介绍如何使用 Spring Boot 实现分布式消息队列。 消息队列是一种存储消息的容器,可以缓存消息并在需要的时候按照一定的规则将消息发送给消费者。常

    2024年02月14日
    浏览(43)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(49)
  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(66)
  • zookeeper+kafka分布式消息队列集群的部署

    目录 一、zookeeper 1.Zookeeper 定义 2.Zookeeper 工作机制 3.Zookeeper 特点 4.Zookeeper 数据结构 5.Zookeeper 应用场景 (1)统一命名服务 (2)统一配置管理 (3)统一集群管理 (4)服务器动态上下线 6.Zookeeper 选举机制 (1)第一次启动选举机制 (2)非第一次启动选举机制 7.部署zookeepe

    2024年02月14日
    浏览(52)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(54)
  • Python 全栈系列239 使用消息队列完成分布式任务

    在Python - 深度学习系列32 - glm2接口部署实践提到,通过部署本地化大模型来完成特定的任务。 由于大模型的部署依赖显卡,且常规量级的任务需要大量的worker支持,从成本考虑,租用算力机是比较经济的。由于任务是属于超高计算传输比的类型,且算力机随时可能出现不稳定

    2024年04月13日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包