延迟消息是在业务场景中比较常用的功能,可以作为延迟队列。比如订单n分钟未支付自动取消,活动倒计时,定时发消息都可以使用到延迟消息。
一、延迟消息介绍
生产者将消息发送到Broker,消费端不会立即消费,需要到达指定延迟时间才能被消费端消费。RocketMQ的延迟消息,默认支持设置18个延迟等级,每一个等级对应一个延迟时间。在Broker中会创建一个默认的SCHEDULE_TOPIC_XXXX的topic,topic下有18个队列,对应18个延迟等级。消息发送过来,会先把消息存储在topic名字为SCHEDULE_TOPIC_XXXX的队列内,等着延迟时间到了,在转发到目标队列,之后消费者在消费。
由于broker是集群模式部署,每个节点都有18个队列。默认的延迟级别不满足业务需求也可以通过Message.setDelayTimeLevel( )方法进行设置,如下代码所示。
//默认
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//修改成20h
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 20h";
//新增20h,等级就变成了19
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 20h";
二、延迟消息举例
生产者发送消息时,设置Message的DelayTimeLevel字段,设置消息的延迟等级。比如设置DelayTimeLevel=4 延迟等级,则对应的延迟为30s。消费者30s之后可以消费到该消息。
Message msg = new Message(topic, tag, body);
//设置延迟level为4 对应30s。传输对应等级,不传输具体时间限制。
//level设置为0表示不延迟
msg.setDelayTimeLevel(level);
三、延时消息实现原理
RocketMQ延迟消息在Broker内部流程图:
步骤说明:
1.Broker接收到写入的消息,先将目标Topic和队列信息存储到消息的属性(Message对象内的Map properties)中,之后修改Message内的目标的Topic为SCHEDULE_TOPIC_XXXX,并根据生产者设置的延迟级别(delayTimeLevel)存入特定的queue。QueueId=delayTimeLevel-1。把消息内容写入到CommitLog中。
对应源码位置:org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
......
// Delay Delivery 是否有延迟级别
if (msg.getDelayTimeLevel() > 0) {
//判断生产者传输的延迟等级是否超过了设置的最大值,超过了需要修改为设置的最大取值
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//修改topic为延迟队列topic SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//根据延迟等级获取队列id msg.getDelayTimeLevel() - 1。确定放到SCHEDULE_TOPIC_XXXX中的那个队列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 记录原始的topic 和 queueId。
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//更新消息发送的topic和queueId
msg.setTopic(topic);
msg.setQueueId(queueId);
}
.....
}
2.异步的把CommitLog中的信息存储到Topic为SCHEDULE_TOPIC_XXXX的CosumeQueue中。CosumeQueue存储结构为CommitLog offset记录在commitLog中的位置、size记录存储消息的大小和MessageTag HashCode记录消息的Tag的哈希值,对于延迟消息记录的是投递时间的时间戳。投递时间 = 消息存储时间(storeTimestamp) + 延迟级别对应的时间。
对应源码位置:org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
// Timing message processing 定时消息处理逻辑
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
//判断topic是否是延迟队列
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
//判断生产者传输的延迟等级是否超过了设置的最大值,超过了需要修改为设置的最大取值
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
//延迟界别大于0需要 计算 投递时间戳 当作消息Tag的哈希值存储到CosumeQueue中。
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
}
3.broker内部的ScheduleMessageService类,作为延迟服务消费SCHEDULE_TOPIC_XXXX的Topic消息,启动时会创建一个定时器timer。根据延迟级别的个数,启动数量相等的TimerTask,每一个TimerTask负责一个延迟级别的消费和投递。每隔1秒钟执行一次DeliverDelayedMessageTimerTask任务,将到期的消息从延迟队列中写入正常Topic中。每个TimerTask检查消息是否到期,只会检查第一条,如果没到期,其他的不会继续检查。如果到期了,投递成功继续检查下一条消息是否过期。
对应源码位置:org.apache.rocketmq.store.schedule.ScheduleMessageService#start
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
public void start() {
//CAS操作 保证了同一时间只会有一个DeliverDelayedMessageTimerTask执行。executeOnTimeup
if (started.compareAndSet(false, true)) {
//创建定时器timer
this.timer = new Timer("ScheduleMessageTimerThread", true);
//遍历延迟队列针对每个级别创建一个TimerTask
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//每隔1秒钟执行一次延迟队列任务,将到期的消息从延迟队列中写入正常Topic中
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
4.消息到期后需要投递到目标的topic中。把第一步记录的目标Topic和队列信息获取到,重新设置Topic存储到CommitLog中,由于之前tagsCode存储的是消息投递时间,需要重新计算tag的哈希值之后在存储。
对应源码位置org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#messageTimeup
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
//重新计算tagcode
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
5.将消息直接投递到目标Topic的CosumeQueue中。文章来源:https://www.toymoban.com/news/detail-485056.html
6.消费者消费目标topic中的数据。文章来源地址https://www.toymoban.com/news/detail-485056.html
到了这里,关于RocketMQ系列5——4.X版本延迟消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!