看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习
入口
SendMessageProcessor.processRequest
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 使用defaultMessageStore.aysncPutMessage存储
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦
MappedFile
public class MappedFile extends ReferenceResource {
// 内存页大小:4k
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 当前进程下 所有的 mappedFile占用的总虚拟内存大小
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前进程下 所有的 mappedFile个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 当前mappedFile数据写入点
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 文件通道
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
// 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量 索引文件: 年月日小时分钟秒.. )
private String fileName;
// 文件名转long
private long fileFromOffset;
// 文件对象
private File file;
// 内存映射缓冲区,访问虚拟内存
private MappedByteBuffer mappedByteBuffer;
// 该文件下 保存的第一条 msg 的存储时间
private volatile long storeTimestamp = 0;
// 当前文件如果是 目录内 有效文件的 首文件的话,该值为true
private boolean firstCreateInQueue = false;
-
构造方法
-
appendMessage方法
-
appendMessage(byte[] data)
-
flush
MappedFileQueue
public class MappedFileQueue {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private static final int DELETE_FILES_BATCH_MAX = 10;
// mfq 管理的目录(CommitLog: ../store/commitlog 或者 consumeQueue: ../store/xxx_topic/0)
private final String storePath;
// 目录下每个文件大小(commitLog文件 默认 1g consumeQueue 文件 默认 600w字节)
private final int mappedFileSize;
// list,目录下的每个 mappedFile 都加入该list
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果 结果 就是 mappedFile对象。
private final AllocateMappedFileService allocateMappedFileService;
// 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)
private long flushedWhere = 0;
private long committedWhere = 0;
// 当前目录下最后一条msg存储时间
private volatile long storeTimestamp = 0;
-
load方法
-
getLastMappedFile
/**
* 参数1:startOffset ,文件起始偏移量
* 参数2:needCreate ,当 list 为空时,是否创建mappedFile
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾
// 两种情况会创建:
// 1. list内没有mappedFile
// 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {// 情况1 list内没有mappedFile
// createOffset 取值 必须是 mappedFileSize 的倍数 或者 0
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2 list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
// 上一个文件名 转long + mappedFileSize
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。
// 获取待创建文件的 绝对路径(下次即将要创建的文件名)
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// 获取 下下次 要创建的文件的 绝对路径
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,
// 内部线程处理完后 会返回给我们结果 结果 就是 mappedFile对象。
// 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
- flush
/**
* @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)
* @return boolean true 表示本次刷盘无数据落盘 false 表示本次刷盘有数据落盘
*/
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 获取当前正在刷盘的文件 (正在顺序写的mappedFile)
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
// 获取mappedFile 最后一条消息的存储时间
long tmpTimeStamp = mappedFile.getStoreTimestamp();
// 调用mf 刷盘 ,返回mf的最新的落盘位点
int offset = mappedFile.flush(flushLeastPages);
// mf起始偏移量 + mf最新的落盘位点
long where = mappedFile.getFileFromOffset() + offset;
// true 表示本次刷盘无数据落盘 false 表示本次刷盘有数据落盘
result = where == this.flushedWhere;
// 将最新的目录刷盘位点 赋值给 flushedWhere
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
CommitLog
public class CommitLog {
// 正常消息魔法值 存储到 commitLog文件时 消息的 第一个字段是 msgSize 第二个字段 就是 魔法值
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 文件尾消息魔法值
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;
// 用于管理 ../store/commitlog 目录下的文件 读写刷盘..
protected final MappedFileQueue mappedFileQueue;
// 存储主模块
protected final DefaultMessageStore defaultMessageStore;
// 刷盘服务,默认情况是异步刷盘:FlushRealTimeService
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
// 控制消息哪些字段追加到 mappedFile
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
// 队列偏移量字典表
// todo 这个偏移量是consumerqueue的偏移量,每次cq加一条,这里面offset + 1
// 有什么作用那?在写commitLog的时候,需要知道当前cq多少了,
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
// 写数据时 加锁开始时间
// todo 默认是0,如果不是0说明有人获取锁
private volatile long beginTimeInLock = 0;
// 写锁 两个实现类:自旋锁 重入锁
protected final PutMessageLock putMessageLock;
-
构造方法
-
load方法
-
recoverNormally方法
/**
* 上次关机 属于 正常关机时执行的 恢复方法
* @param maxPhyOffsetOfConsumeQueue (存储主模块启动阶段 先恢复的是 所有的 ConsumeQueue数据,再恢复的是 CommitLog 数据
* maxPhyOffsetOfConsumeQueue 表示 恢复阶段 ConsumeQueue 中已知的 最大 消息Offset)
*/
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
// 是否检查CRC 默认:true
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {// 进入恢复逻辑
// 从倒数老三 开始向后 恢复
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
// 待恢复mappedFile
MappedFile mappedFile = mappedFiles.get(index);
// 获取待恢复mappedFile的mappedByteBuffer的切片(包含全部文件的数据)
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
long processOffset = mappedFile.getFileFromOffset();
// 待处理mappedFile的处理位点(文件内的位点,这个是从0开始)
long mappedFileOffset = 0;
while (true) {
// 从切片内解析出一条 msg 封装成 dispatchRequest对象
// 特殊情况:1. magic_code 表示文件尾 2.request.size = -1 文件内的所有消息 都处理完了,并且 还未到达 文件尾。
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {// 正常消息 走这里
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {// 文件尾这种情况 走这里
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
// 获取下一个 mappedFile
mappedFile = mappedFiles.get(index);
// 获取这个mappedFile的切片
byteBuffer = mappedFile.sliceByteBuffer();
// 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
processOffset = mappedFile.getFileFromOffset();
// 归0
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
// 执行到这里,说明 待恢复的数据 已经 检查一遍了
// 再+mappedFileOffset 它之前 processOffset 的值是 上面 最后一个文件的 文件名 的值
// 再加上 mappedFileOffset processOffset 表示的值就是 commitLog 全局位点
processOffset += mappedFileOffset;
// 设置 mfq 的刷盘位点
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
// 调整mfq内当前正在顺序写的 mappedFile 的刷盘点 和 写入点
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
// 删除consumeQueue下的脏文件
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
- asyncPutMessage方法
/**
* @param msg (消息)
*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 设置个存储时间,后面获取到 写锁后 这个事件 会重写。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// 计算 body crc 值
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// AppendMessageCallBack 的返回值变量
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 获取消息 主题 和 队列ID
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延迟级别 - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 将消息的 “topic” “%RETRY%GroupName” queueID 0 记录到消息的属性中
// key:“REAL_TOPIC” “REAL_QUEUE_ID”
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 修改 主题为 SCHEDULE_TOPIC_XXXX
msg.setTopic(topic);
// 队列为 延迟级别 - 1
msg.setQueueId(queueId);
}
}
// 持锁时间
long elapsedTimeInLock = 0;
// 待释放锁定状态的mappedFile(lock状态的mappedFile使用的内存 会 锁死在 物理内存中,不会使用 swap 区,性能很好)
MappedFile unlockMappedFile = null;
// 获取当前 顺序写的 mappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 获取锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 获取锁的时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {// 条件成立:1. commitlog 目录下是空 2.文件写满了..
// 创建新的 mappedFile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 正常执行到这里..
// 参数1:消息
// 参数2:amc ,控制消息哪些字段 追加到 mf 内
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
// 计算 加锁期间耗时
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 释放锁
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 将mappedByteBuffer 从 Lock 状态 切换为 unlock 状态
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 通知刷盘线程
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// HA 相关的...以后再说
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
}
ConsumeQueue
DefaultMessageStore
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// 获取配置中刷盘“脏页阈值”、“强刷周期”,如果达到强刷周期60s,那就强制刷盘,否则刷脏页
// todo commit提交之后,通过消息分发服务调用 消费队列msg分发器,msg分发器会把数据写入到 上面的consumeQueueTable,同时也会写入到mappfiled中,但是没有刷盘,需要
// 这个服务落盘
private final FlushConsumeQueueService flushConsumeQueueService;
// todo 1、先调用commitLogService,然后再调用cleanConsumeQueueService,会封装成 任务,丢给下面的调度线程池执行
// 清理commitLogService
// todo 1、凌晨4点 2、当前commitlog、cq的目录磁盘使用率达到75, 根据获取过期时间72小时,看commitlog上次修改时间有没有超过72小时,有就删除,删除完minoffset要变动
private final CleanCommitLogService cleanCommitLogService;
// todo 1、配置中获取“清理2次cq时间间隔” 2、从commitlog获取“minoffset” 3、如果 “lastPhysicalMinOffset” < minoffset(说明cq 删除位点很小),那就要把这些cq删除
private final CleanConsumeQueueService cleanConsumeQueueService;
// 索引服务
private final IndexService indexService;
// 创建mapfile对象,细节再看
// todo mfq.getLastmf() 获取正在写mapedFile,如果mf没有,或者写满了,就需要提交一个任务给 allocateMappedFileService 获取mf
private final AllocateMappedFileService allocateMappedFileService;
private final ReputMessageService reputMessageService;
private final HAService haService;
private final ScheduleMessageService scheduleMessageService;
private final StoreStatsService storeStatsService;
private final TransientStorePool transientStorePool;
private final RunningFlags runningFlags = new RunningFlags();
// 时钟
private final SystemClock systemClock = new SystemClock();
// 单线程的调度线程池
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
private final MessageArrivingListener messageArrivingListener;
private final BrokerConfig brokerConfig;
private volatile boolean shutdown = true;
private StoreCheckpoint storeCheckpoint;
private AtomicLong printTimes = new AtomicLong(0);
// 转换器
private final LinkedList<CommitLogDispatcher> dispatcherList;
private RandomAccessFile lockFile;
private FileLock lock;
boolean shutDownNormal = false;
最核心的几个方法(从这个方法开始,一行行debug来将之前讲过的东西全部串起来):
- load方法
commit log.load
这里主要做的事就是从commit log的目录中加载数据,存放到mappedFile中,设置到writePosition和flushPosition,这里设置位点不是准确值, 接下来是
consumerQueue.load
这里就是将consumequeue/topic 下面对应的文件都加载到ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable这个里面去。
比如我这里存储的consumequeue是这样子的:
最终存储的样子是这样的:
再接下来是 indexService的load方法,将index目录的索引文件加载到indexFile中
接下来是比较重要的recover方法
注意这里maxPhysicOffset = offset + size, 外面会用到
总结一下:
遍历consumequeu下面的topic所有的文件,比如下图这样的,遍历所有的topic下面所有的queue下面所有文件,如果queue下面有多个文件,那就从倒数第三个文件开始遍历,不断的累加maxPhysicOffset
当获取到maxPhyOffsetOfConsumeQueue,要做一个将consumequeue与commitlog对齐的操作
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
// 是否检查CRC 默认:true
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {// 进入恢复逻辑
// 从倒数老三 开始向后 恢复
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
// 待恢复mappedFile
MappedFile mappedFile = mappedFiles.get(index);
// 获取待恢复mappedFile的mappedByteBuffer的切片(包含全部文件的数据)
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
long processOffset = mappedFile.getFileFromOffset();
// 待处理mappedFile的处理位点(文件内的位点,这个是从0开始)
long mappedFileOffset = 0;
while (true) {
// 从切片内解析出一条 msg 封装成 dispatchRequest对象
// 特殊情况:1. magic_code 表示文件尾 2.request.size = -1 文件内的所有消息 都处理完了,并且 还未到达 文件尾。
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {// 正常消息 走这里
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {// 文件尾这种情况 走这里
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
// 获取下一个 mappedFile
mappedFile = mappedFiles.get(index);
// 获取这个mappedFile的切片
byteBuffer = mappedFile.sliceByteBuffer();
// 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
processOffset = mappedFile.getFileFromOffset();
// 归0
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
// 执行到这里,说明 待恢复的数据 已经 检查一遍了
// 再+mappedFileOffset 它之前 processOffset 的值是 上面 最后一个文件的 文件名 的值
// 再加上 mappedFileOffset processOffset 表示的值就是 commitLog 全局位点
processOffset += mappedFileOffset;
// 设置 mfq 的刷盘位点
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
// 调整mfq内当前正在顺序写的 mappedFile 的刷盘点 和 写入点
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
// 删除consumeQueue下的脏文件
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
这里最重要的就是 从倒数第三个文件开始遍历 commit log文件(每遍历一个commit log 其实就是解析里面msg的过程),直到commit log数据为空了,就得到commit log里面最大的物理偏移量,如果说 consume queue里面的最大物理偏移量大于 commit log的最大物理偏移量,那就删除 consume queue里面的部分数据
我们来看一下load完之后,commitlog和consumequeue加载到内存中是什么样的对象?
总结:
1、从consumequeue加载数据,并恢复位点
2、从commitlog加载数据,并恢复位点,如果consumequeue最大物理偏移量> commitlog最大物理偏移量,需要删除掉(很容易理解,commitlog才是存储消息的核心位置,以它为主)
- start 方法
public void start() throws Exception {
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
简单总结一下:
1、防止多broker启动,获取文件锁
2、遍历consumeQueue对象,获取maxPhysicalPosInLogicQueue, 这里要设置 消息分发服务的分发位点 maxPhysicalPosInLogicQueue,因为大部分情况下,consume queue的物理位点是小于等于 commitlog的物理位点的
3、启动分发服务,让consumequeue文件和index文件和 commitlog文件对齐,把commitlog的数据拉到consumequeue和index
4、等到分发结束,再次构建对列偏移字典表,ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable;
5、启动消费对列刷盘服务
6、启动commit log,也就是启动commitlog的刷盘服务
7、创建abort文件,正常jvm hook的时候,会删除该文件
8、向sheduleService 提交定时清理文件的定时任务(延迟1s,周期10s)
- asyncPutMessage(MessageExtBrokerInner msg)
commitlog.aysncPutMessage
/**
* @param msg (消息)
*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 设置个存储时间,后面获取到 写锁后 这个事件 会重写。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// 计算 body crc 值
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// AppendMessageCallBack 的返回值变量
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 获取消息 主题 和 队列ID
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延迟级别 - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 将消息的 “topic” “%RETRY%GroupName” queueID 0 记录到消息的属性中
// key:“REAL_TOPIC” “REAL_QUEUE_ID”
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 修改 主题为 SCHEDULE_TOPIC_XXXX
msg.setTopic(topic);
// 队列为 延迟级别 - 1
msg.setQueueId(queueId);
}
}
// 持锁时间
long elapsedTimeInLock = 0;
// 待释放锁定状态的mappedFile(lock状态的mappedFile使用的内存 会 锁死在 物理内存中,不会使用 swap 区,性能很好)
MappedFile unlockMappedFile = null;
// 获取当前 顺序写的 mappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 获取锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 获取锁的时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {// 条件成立:1. commitlog 目录下是空 2.文件写满了..
// 创建新的 mappedFile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 正常执行到这里..
// 参数1:消息
// 参数2:amc ,控制消息哪些字段 追加到 mf 内
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
// 计算 加锁期间耗时
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 释放锁
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 将mappedByteBuffer 从 Lock 状态 切换为 unlock 状态
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 通知刷盘线程
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
}
在往commitlog写数据的时候,会找到最后一个正在写的mappedFile,然后调用它的mappedFile.appendMessage(msg, this.appendMessageCallback)文章来源:https://www.toymoban.com/news/detail-792297.html
会使用AppendMessageCallback.doAppend来完成数据的写入,对于commit log来说,它的AppendMessageCallback的实现类是DefaultAppendMessageCallback,最终会使用它的doAppend来完成数据的写入
文章来源地址https://www.toymoban.com/news/detail-792297.html
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// 这条消息的 物理offset
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
this.resetByteBuffer(storeHostHolder, storeHostLength);
String msgId;
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
- getMessage()
到后面讲消费者源码的时候再debug
到了这里,关于全网最细RocketMQ源码四:消息存储的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!