【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

这篇具有很好参考价值的文章主要介绍了【分布式技术专题】RocketMQ延迟消息实现原理和源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

痛点背景

业务场景

假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做?

之前方案

最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30分钟,则关闭订单。

方案评估
  • 优点:是实现简单,缺点呢?
  • *缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。
延时队列出现
  • 能够在指定时间间隔后触发某个业务操作
  • 能够应对业务数据量特别大的特殊场景

RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。

功能特点

  • RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
  • *broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

延时队列生产者端:

延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。

public class Producer {
	private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("111.231.110.149:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );

                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
复制代码
初始化

DefaultMessageStore在启动时,会调用ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应map,然后调用ScheduleMessageService#start()方法来启动类

load方法

public boolean load() {
        boolean result = super.load();
        result = result && this.parseDelayLevel();
        return result;
}
复制代码

ScheduleMessageService继承自ConfigManager类,super.load()方法对应

public boolean load() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName);

            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
}
复制代码
延时队列源码分析:

先从延时消息延迟级别设置与broker端消息持久化入手。

具体实现

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq

启动延迟消息定时任务

如果想要深入了解的可以看一下ScheduleMessageService这个类 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq

内部变量含义

延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq

  • delayLevelTable定义了延迟级别和延迟时间的对应关系
  • *offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()
复制代码

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq

延迟消息投递

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq

其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
}
复制代码

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq

核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown

分享资源

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析,分布式&amp;微服务技术体系,分布式,rocketmq
获取以上资源请访问开源项目 点击跳转文章来源地址https://www.toymoban.com/news/detail-636897.html

到了这里,关于【分布式技术专题】RocketMQ延迟消息实现原理和源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 云事业群CTO线技术晋升考核机试题-分布式专题-A 分布式锁

    作者:田超凡 1 什么是锁?什么是分布式锁?二者有什么区别? 答: 锁是线程级别的概念,锁的目的是保证线程间业务代码执行的幂等性,同一时刻只有一个线程能够获取到锁并执行业务代码,其他没获取到锁的线程会一直阻塞等待获取到锁的线程释放锁之后才会被唤醒并

    2024年02月12日
    浏览(33)
  • 【分布式技术专题】「分布式技术架构」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)

    Tomcat的总体结构从外到内进行分布,最大范围的服务容器是Server组件,Service服务组件(可以有多个同时存在),Connector(连接器)、Container(容器服务),其他组件:Jasper(Jasper解析)、Naming(命名服务)、Session(会话管理)、Logging(日志管理)、JMX(Java 管理器扩展服务

    2024年01月24日
    浏览(29)
  • 云事业群CTO线技术晋升考核机试题-分布式专题-D 分布式数据同步

      作者:田超凡 1 缓存一致性产生背景 答:当需要频繁访问数据库的时候,虽然数据库底层基于B+索引检索数据,但是仍然会十分消耗磁盘IO资源,导致数据库访问压力增加。 此时可以基于缓存设计来减轻数据库访问压力。 2 多级缓存架构设计方案 答:多级缓存架构设计采用

    2024年02月16日
    浏览(31)
  • 云事业群CTO线技术晋升考核机试题-分布式专题-C 分布式任务调度

    作者:田超凡 1 传统的定时任务存在那些缺点 答:传统定时任务的缺点: 定时任务逻辑和业务逻辑在一个服务中,是强耦合的,没有实现完全解耦,当定时任务逻辑出现问题,也会影响业务逻辑的执行。 定时任务非常消耗cpu资源,可能会影响业务线程的执行。 服务器集群环

    2024年02月15日
    浏览(29)
  • 【分布式技术专题】「分布式ID系列」百度开源的分布式高性能的唯一ID生成器UidGenerator

    UidGenerator是什么 UidGenerator是百度开源的一款分布式高性能的唯一ID生成器,更详细的情况可以查看官网集成文档 uid-generator是基于Twitter开源的snowflake算法实现的一款唯一主键生成器(数据库表的主键要求全局唯一是相当重要的)。要求java8及以上版本。 snowflake算法 Snowflake算法描

    2024年02月04日
    浏览(48)
  • 云事业群CTO线技术晋升考核机试题-分布式专题-F 分布式服务链路动态追踪

    作者:田超凡 1 分布式服务链路动态追踪产生的背景 答:在分布式微服务系统中,随着业务的发展,系统的规模也越来越大,服务和服务之间的调用关系也越来越复杂。比如一次HTTP请求可能会在多个服务和服务之间进行多次组合调用,在这个过程中,当一个服务出现故障,

    2024年02月16日
    浏览(33)
  • 云事业群CTO线技术晋升考核机试题-分布式专题-G 分布式幂等架构设计

    作者:田超凡 1 幂等的基本概念 答:幂等指的是同一块业务逻辑重复多次执行时,只能令其生效一次,防止重复执行。 2 幂等的发生场景 答: RPC 调用接口的幂等性问题 MQ 消费者防止重复消费的幂等性问题 定时任务防止重复执行的幂等性问题 3 RPC调用接口的幂等性问题产生

    2024年02月16日
    浏览(21)
  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(38)
  • 架构核心技术之分布式消息队列

    Java全能学习+面试指南:https://javaxiaobear.cn 今天我们来学习分布式消息队列,分布式消息队列的知识结构如下图。 主要介绍以下内容: 同步架构和异步架构的区别。异步架构的主要组成部分:消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型:点对点模型

    2024年02月07日
    浏览(31)
  • 【分布式技术专题】「单点登录技术架构」一文带领你好好认识以下Saml协议的运作机制和流程模式

    传统上,企业应用程序在公司网络中部署和运行。为了获取有关用户的信息,如用户配置文件和组信息,这些应用程序中的许多都是为与公司目录(如Microsoft Active Directory)集成而构建的。更重要的是,通常使用目录存储和验证用户的凭据。例如,如果您使用在本地运行的Share

    2024年02月05日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包