全网最细RocketMQ源码四:消息存储

这篇具有很好参考价值的文章主要介绍了全网最细RocketMQ源码四:消息存储。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

        CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
        // 使用defaultMessageStore.aysncPutMessage存储
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {
    // 内存页大小:4k
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 当前进程下 所有的 mappedFile占用的总虚拟内存大小
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // 当前进程下 所有的 mappedFile个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 当前mappedFile数据写入点
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;

    // 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )
    private String fileName;
    // 文件名转long
    private long fileFromOffset;
    // 文件对象
    private File file;
    // 内存映射缓冲区,访问虚拟内存
    private MappedByteBuffer mappedByteBuffer;
    // 该文件下 保存的第一条 msg 的存储时间
    private volatile long storeTimestamp = 0;
    // 当前文件如果是 目录内 有效文件的 首文件的话,该值为true
    private boolean firstCreateInQueue = false;
  • 构造方法
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  • appendMessage方法
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  • appendMessage(byte[] data)
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  • flush
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

MappedFileQueue

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)
    private final String storePath;

    // 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)
    private final int mappedFileSize;

    // list,目录下的每个 mappedFile 都加入该list
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。
    private final AllocateMappedFileService allocateMappedFileService;

    // 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)
    private long flushedWhere = 0;
    private long committedWhere = 0;

    // 当前目录下最后一条msg存储时间
    private volatile long storeTimestamp = 0;
  • load方法
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  • getLastMappedFile

 /**
     * 参数1:startOffset ,文件起始偏移量
     * 参数2:needCreate ,当 list 为空时,是否创建mappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾
        // 两种情况会创建:
        // 1. list内没有mappedFile
        // 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
        long createOffset = -1;

        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {// 情况1  list内没有mappedFile
            // createOffset 取值 必须是 mappedFileSize 的倍数 或者 0
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
            // 上一个文件名 转long + mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }


        if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。
            // 获取待创建文件的 绝对路径(下次即将要创建的文件名)
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);

            // 获取 下下次 要创建的文件的 绝对路径
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);

            MappedFile mappedFile = null;


            if (this.allocateMappedFileService != null) {
                // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,
                // 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。

                // 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }


            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

  • flush
 /**
     * @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)
     * @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
     */
    public boolean flush(final int flushLeastPages) {
        boolean result = true;

        // 获取当前正在刷盘的文件 (正在顺序写的mappedFile)
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

        if (mappedFile != null) {
            // 获取mappedFile 最后一条消息的存储时间
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 调用mf 刷盘 ,返回mf的最新的落盘位点
            int offset = mappedFile.flush(flushLeastPages);
            // mf起始偏移量 + mf最新的落盘位点
            long where = mappedFile.getFileFromOffset() + offset;
            // true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
            result = where == this.flushedWhere;
            // 将最新的目录刷盘位点 赋值给 flushedWhere
            this.flushedWhere = where;

            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        return result;
    }

CommitLog

public class CommitLog {
    // 正常消息魔法值  存储到 commitLog文件时  消息的 第一个字段是 msgSize 第二个字段 就是 魔法值
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;

    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    // 文件尾消息魔法值
    // End of file empty MAGIC CODE cbd43194
    protected final static int BLANK_MAGIC_CODE = -875286124;


    // 用于管理 ../store/commitlog 目录下的文件 读写刷盘..
    protected final MappedFileQueue mappedFileQueue;

    // 存储主模块
    protected final DefaultMessageStore defaultMessageStore;

    // 刷盘服务,默认情况是异步刷盘:FlushRealTimeService
    private final FlushCommitLogService flushCommitLogService;

    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    private final FlushCommitLogService commitLogService;

    // 控制消息哪些字段追加到 mappedFile
    private final AppendMessageCallback appendMessageCallback;

    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;

    // 队列偏移量字典表
    // todo 这个偏移量是consumerqueue的偏移量,每次cq加一条,这里面offset + 1
    //  有什么作用那?在写commitLog的时候,需要知道当前cq多少了,
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);

    protected volatile long confirmOffset = -1L;

    // 写数据时  加锁开始时间
    // todo 默认是0,如果不是0说明有人获取锁
    private volatile long beginTimeInLock = 0;

    // 写锁 两个实现类:自旋锁   重入锁
    protected final PutMessageLock putMessageLock;
  • 构造方法
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  • load方法
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  • recoverNormally方法

 /**
     * 上次关机 属于 正常关机时执行的 恢复方法
     * @param maxPhyOffsetOfConsumeQueue (存储主模块启动阶段 先恢复的是 所有的 ConsumeQueue数据,再恢复的是 CommitLog 数据
     *                                   maxPhyOffsetOfConsumeQueue 表示 恢复阶段  ConsumeQueue 中已知的 最大 消息Offset)
     */
    /**
     * When the normal exit, data recovery, all memory data have been flush
     */
    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        // 是否检查CRC 默认:true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();

        if (!mappedFiles.isEmpty()) {// 进入恢复逻辑

            // 从倒数老三 开始向后 恢复
            // Began to recover from the last third file
            int index = mappedFiles.size() - 3;

            if (index < 0)
                index = 0;


            // 待恢复mappedFile
            MappedFile mappedFile = mappedFiles.get(index);

            // 获取待恢复mappedFile的mappedByteBuffer的切片(包含全部文件的数据)
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

            // 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
            long processOffset = mappedFile.getFileFromOffset();

            // 待处理mappedFile的处理位点(文件内的位点,这个是从0开始)
            long mappedFileOffset = 0;

            while (true) {
                // 从切片内解析出一条 msg 封装成 dispatchRequest对象
                // 特殊情况:1. magic_code 表示文件尾  2.request.size = -1 文件内的所有消息 都处理完了,并且 还未到达 文件尾。
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);

                int size = dispatchRequest.getMsgSize();

                // Normal data
                if (dispatchRequest.isSuccess() && size > 0) {// 正常消息 走这里
                    mappedFileOffset += size;
                }
                // Come the end of the file, switch to the next file Since the
                // return 0 representatives met last hole,
                // this can not be included in truncate offset
                else if (dispatchRequest.isSuccess() && size == 0) {// 文件尾这种情况 走这里

                    index++;
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        // 获取下一个 mappedFile
                        mappedFile = mappedFiles.get(index);
                        // 获取这个mappedFile的切片
                        byteBuffer = mappedFile.sliceByteBuffer();
                        // 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
                        processOffset = mappedFile.getFileFromOffset();
                        // 归0
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                else if (!dispatchRequest.isSuccess()) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }
            // 执行到这里,说明 待恢复的数据 已经 检查一遍了

            // 再+mappedFileOffset 它之前 processOffset 的值是 上面 最后一个文件的 文件名 的值
            // 再加上 mappedFileOffset processOffset 表示的值就是 commitLog 全局位点
            processOffset += mappedFileOffset;

            // 设置 mfq 的刷盘位点
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);

            // 调整mfq内当前正在顺序写的 mappedFile 的刷盘点 和 写入点
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                // 删除consumeQueue下的脏文件
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } else {
            // Commitlog case files are deleted
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }
  • asyncPutMessage方法

    /**
     * @param msg (消息)
     */
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // 设置个存储时间,后面获取到 写锁后 这个事件 会重写。
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());

        // 计算 body crc 值
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

        // AppendMessageCallBack 的返回值变量
        // Back to Results
        AppendMessageResult result = null;


        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        // 获取消息 主题 和 队列ID
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();


        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {

                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                // SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 延迟级别 - 1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // 将消息的 “topic” “%RETRY%GroupName”  queueID 0 记录到消息的属性中
                // key:“REAL_TOPIC”  “REAL_QUEUE_ID”

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                // 修改 主题为 SCHEDULE_TOPIC_XXXX
                msg.setTopic(topic);
                // 队列为  延迟级别 - 1
                msg.setQueueId(queueId);
            }
        }

        // 持锁时间
        long elapsedTimeInLock = 0;

        // 待释放锁定状态的mappedFile(lock状态的mappedFile使用的内存 会 锁死在 物理内存中,不会使用 swap 区,性能很好)
        MappedFile unlockMappedFile = null;

        // 获取当前 顺序写的 mappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        // 获取锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            // 获取锁的时间
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);



            if (null == mappedFile || mappedFile.isFull()) {// 条件成立:1. commitlog 目录下是空  2.文件写满了..
                // 创建新的 mappedFile
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }

            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }

            // 正常执行到这里..

            // 参数1:消息
            // 参数2:amc ,控制消息哪些字段 追加到 mf 内
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);

            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;

                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);

                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            // 计算 加锁期间耗时
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            // 释放锁
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }


        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            // 将mappedByteBuffer 从 Lock 状态 切换为 unlock 状态
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }


        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());


        // 通知刷盘线程
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // HA 相关的...以后再说
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });

    }

ConsumeQueue

DefaultMessageStore

public class DefaultMessageStore implements MessageStore {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final MessageStoreConfig messageStoreConfig;
    // CommitLog
    private final CommitLog commitLog;

    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

    // 获取配置中刷盘“脏页阈值”、“强刷周期”,如果达到强刷周期60s,那就强制刷盘,否则刷脏页
    // todo commit提交之后,通过消息分发服务调用 消费队列msg分发器,msg分发器会把数据写入到 上面的consumeQueueTable,同时也会写入到mappfiled中,但是没有刷盘,需要
    //  这个服务落盘
    private final FlushConsumeQueueService flushConsumeQueueService;

    // todo 1、先调用commitLogService,然后再调用cleanConsumeQueueService,会封装成 任务,丢给下面的调度线程池执行
    // 清理commitLogService
    // todo 1、凌晨4点 2、当前commitlog、cq的目录磁盘使用率达到75, 根据获取过期时间72小时,看commitlog上次修改时间有没有超过72小时,有就删除,删除完minoffset要变动
    private final CleanCommitLogService cleanCommitLogService;

    // todo 1、配置中获取“清理2次cq时间间隔” 2、从commitlog获取“minoffset” 3、如果 “lastPhysicalMinOffset” < minoffset(说明cq 删除位点很小),那就要把这些cq删除
    private final CleanConsumeQueueService cleanConsumeQueueService;

    // 索引服务
    private final IndexService indexService;

    // 创建mapfile对象,细节再看
    // todo mfq.getLastmf() 获取正在写mapedFile,如果mf没有,或者写满了,就需要提交一个任务给 allocateMappedFileService 获取mf
    private final AllocateMappedFileService allocateMappedFileService;

  
    private final ReputMessageService reputMessageService;

    private final HAService haService;

    private final ScheduleMessageService scheduleMessageService;

    private final StoreStatsService storeStatsService;

    private final TransientStorePool transientStorePool;


    private final RunningFlags runningFlags = new RunningFlags();

    // 时钟
    private final SystemClock systemClock = new SystemClock();

    // 单线程的调度线程池
    private final ScheduledExecutorService scheduledExecutorService =
        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));

    private final BrokerStatsManager brokerStatsManager;
    private final MessageArrivingListener messageArrivingListener;
    private final BrokerConfig brokerConfig;

    private volatile boolean shutdown = true;

    private StoreCheckpoint storeCheckpoint;

    private AtomicLong printTimes = new AtomicLong(0);

    // 转换器
    private final LinkedList<CommitLogDispatcher> dispatcherList;

    private RandomAccessFile lockFile;

    private FileLock lock;

    boolean shutDownNormal = false;

最核心的几个方法(从这个方法开始,一行行debug来将之前讲过的东西全部串起来):

  • load方法
    commit log.load全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq这里主要做的事就是从commit log的目录中加载数据,存放到mappedFile中,设置到writePosition和flushPosition,这里设置位点不是准确值, 接下来是
    consumerQueue.load
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    这里就是将consumequeue/topic 下面对应的文件都加载到ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable这个里面去。
    比如我这里存储的consumequeue是这样子的:
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    最终存储的样子是这样的:
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    再接下来是 indexService的load方法,将index目录的索引文件加载到indexFile中
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    接下来是比较重要的recover方法

全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
注意这里maxPhysicOffset = offset + size, 外面会用到全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
总结一下:
遍历consumequeu下面的topic所有的文件,比如下图这样的,遍历所有的topic下面所有的queue下面所有文件,如果queue下面有多个文件,那就从倒数第三个文件开始遍历,不断的累加maxPhysicOffset
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
当获取到maxPhyOffsetOfConsumeQueue,要做一个将consumequeue与commitlog对齐的操作
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq

  public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        // 是否检查CRC 默认:true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();

        if (!mappedFiles.isEmpty()) {// 进入恢复逻辑

            // 从倒数老三 开始向后 恢复
            // Began to recover from the last third file
            int index = mappedFiles.size() - 3;

            if (index < 0)
                index = 0;


            // 待恢复mappedFile
            MappedFile mappedFile = mappedFiles.get(index);

            // 获取待恢复mappedFile的mappedByteBuffer的切片(包含全部文件的数据)
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

            // 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
            long processOffset = mappedFile.getFileFromOffset();

            // 待处理mappedFile的处理位点(文件内的位点,这个是从0开始)
            long mappedFileOffset = 0;

            while (true) {
                // 从切片内解析出一条 msg 封装成 dispatchRequest对象
                // 特殊情况:1. magic_code 表示文件尾  2.request.size = -1 文件内的所有消息 都处理完了,并且 还未到达 文件尾。
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);

                int size = dispatchRequest.getMsgSize();

                // Normal data
                if (dispatchRequest.isSuccess() && size > 0) {// 正常消息 走这里
                    mappedFileOffset += size;
                }
                // Come the end of the file, switch to the next file Since the
                // return 0 representatives met last hole,
                // this can not be included in truncate offset
                else if (dispatchRequest.isSuccess() && size == 0) {// 文件尾这种情况 走这里

                    index++;
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        // 获取下一个 mappedFile
                        mappedFile = mappedFiles.get(index);
                        // 获取这个mappedFile的切片
                        byteBuffer = mappedFile.sliceByteBuffer();
                        // 获取待恢复mappedFile 的文件名 为 起始处理偏移量(目录位点)
                        processOffset = mappedFile.getFileFromOffset();
                        // 归0
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                else if (!dispatchRequest.isSuccess()) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }
            // 执行到这里,说明 待恢复的数据 已经 检查一遍了

            // 再+mappedFileOffset 它之前 processOffset 的值是 上面 最后一个文件的 文件名 的值
            // 再加上 mappedFileOffset processOffset 表示的值就是 commitLog 全局位点
            processOffset += mappedFileOffset;

            // 设置 mfq 的刷盘位点
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);

            // 调整mfq内当前正在顺序写的 mappedFile 的刷盘点 和 写入点
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                // 删除consumeQueue下的脏文件
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } else {
            // Commitlog case files are deleted
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }

这里最重要的就是 从倒数第三个文件开始遍历 commit log文件(每遍历一个commit log 其实就是解析里面msg的过程),直到commit log数据为空了,就得到commit log里面最大的物理偏移量,如果说 consume queue里面的最大物理偏移量大于 commit log的最大物理偏移量,那就删除 consume queue里面的部分数据

我们来看一下load完之后,commitlog和consumequeue加载到内存中是什么样的对象?
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
总结:
1、从consumequeue加载数据,并恢复位点
2、从commitlog加载数据,并恢复位点,如果consumequeue最大物理偏移量> commitlog最大物理偏移量,需要删除掉(很容易理解,commitlog才是存储消息的核心位置,以它为主)

  • start 方法
    public void start() throws Exception {

        lock = lockFile.getChannel().tryLock(0, 1, false);
        if (lock == null || lock.isShared() || !lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }

        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
        lockFile.getChannel().force(true);
        {
            /**
             * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
             * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
             * 3. Calculate the reput offset according to the consume queue;
             * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
             */
            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
            for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                    }
                }
            }
            if (maxPhysicalPosInLogicQueue < 0) {
                maxPhysicalPosInLogicQueue = 0;
            }
            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
                /**
                 * This happens in following conditions:
                 * 1. If someone removes all the consumequeue files or the disk get damaged.
                 * 2. Launch a new broker, and copy the commitlog from other brokers.
                 *
                 * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
                 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
                 */
                log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
            }
            log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
            this.reputMessageService.start();

            /**
             *  1. Finish dispatching the messages fall behind, then to start other services.
             *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
             */
            while (true) {
                if (dispatchBehindBytes() <= 0) {
                    break;
                }
                Thread.sleep(1000);
                log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
            }
            this.recoverTopicQueueTable();
        }

        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService.start();
            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
        }

        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.storeStatsService.start();

        this.createTempFile();
        this.addScheduleTask();
        this.shutdown = false;
    }

简单总结一下:
1、防止多broker启动,获取文件锁
2、遍历consumeQueue对象,获取maxPhysicalPosInLogicQueue, 这里要设置 消息分发服务的分发位点 maxPhysicalPosInLogicQueue,因为大部分情况下,consume queue的物理位点是小于等于 commitlog的物理位点的
3、启动分发服务,让consumequeue文件和index文件和 commitlog文件对齐,把commitlog的数据拉到consumequeue和index
4、等到分发结束,再次构建对列偏移字典表,ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable;
5、启动消费对列刷盘服务
6、启动commit log,也就是启动commitlog的刷盘服务
7、创建abort文件,正常jvm hook的时候,会删除该文件
8、向sheduleService 提交定时清理文件的定时任务(延迟1s,周期10s)

  • asyncPutMessage(MessageExtBrokerInner msg)
    全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
    commitlog.aysncPutMessage
  /**
     * @param msg (消息)
     */
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // 设置个存储时间,后面获取到 写锁后 这个事件 会重写。
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());

        // 计算 body crc 值
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

        // AppendMessageCallBack 的返回值变量
        // Back to Results
        AppendMessageResult result = null;


        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        // 获取消息 主题 和 队列ID
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();


        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {

                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                // SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 延迟级别 - 1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // 将消息的 “topic” “%RETRY%GroupName”  queueID 0 记录到消息的属性中
                // key:“REAL_TOPIC”  “REAL_QUEUE_ID”

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                // 修改 主题为 SCHEDULE_TOPIC_XXXX
                msg.setTopic(topic);
                // 队列为  延迟级别 - 1
                msg.setQueueId(queueId);
            }
        }

        // 持锁时间
        long elapsedTimeInLock = 0;

        // 待释放锁定状态的mappedFile(lock状态的mappedFile使用的内存 会 锁死在 物理内存中,不会使用 swap 区,性能很好)
        MappedFile unlockMappedFile = null;

        // 获取当前 顺序写的 mappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        // 获取锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            // 获取锁的时间
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);



            if (null == mappedFile || mappedFile.isFull()) {// 条件成立:1. commitlog 目录下是空  2.文件写满了..
                // 创建新的 mappedFile
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }

            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }

            // 正常执行到这里..

            // 参数1:消息
            // 参数2:amc ,控制消息哪些字段 追加到 mf 内
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);

            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;

                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);

                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            // 计算 加锁期间耗时
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            // 释放锁
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }


        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            // 将mappedByteBuffer 从 Lock 状态 切换为 unlock 状态
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }


        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());


        // 通知刷盘线程
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
      
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });

    }

在往commitlog写数据的时候,会找到最后一个正在写的mappedFile,然后调用它的mappedFile.appendMessage(msg, this.appendMessageCallback)

全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq
会使用AppendMessageCallback.doAppend来完成数据的写入,对于commit log来说,它的AppendMessageCallback的实现类是DefaultAppendMessageCallback,最终会使用它的doAppend来完成数据的写入
全网最细RocketMQ源码四:消息存储,# 精通RocketMQ,rocketmq文章来源地址https://www.toymoban.com/news/detail-792297.html

   public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

            // 这条消息的 物理offset
            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();

            int sysflag = msgInner.getSysFlag();

            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

            this.resetByteBuffer(storeHostHolder, storeHostLength);
            String msgId;
            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            } else {
                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            }

            // Record ConsumeQueue information
            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append('-');
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

            /**
             * Serialize message
             */
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;

            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;
        }
  • getMessage()
    到后面讲消费者源码的时候再debug

到了这里,关于全网最细RocketMQ源码四:消息存储的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ源码阅读-Broker消息接收

    Broker接收 Producer 发送的消息。 Broker在RocketMQ中也是一个独立的Model,rocketmq-broker。 Broker的核心类为SendMessageProcessor。 同样从单元测试入手,看Broker接收消息的流程。 SendMessageProcessor的单元测试类为org.apache.rocketmq.broker.processor.SendMessageProcessorTest。 包含上面这些方法,其中ini

    2024年01月17日
    浏览(25)
  • 消息中间件之RocketMQ源码分析(十)

    启动命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf dev/null 21 通过脚本配置启动基本参数,比如配置文件路径、JVM参数,调用NamesrvStartup.main()方法,解析命令行的参数,将处理好的参数转化为Java实例,传递给NamesrvController实例 加载命令行传递的配置参数,调用controller.initialize()方法初

    2024年02月20日
    浏览(42)
  • [RocketMQ] Broker asyncPutMessage处理消息以及存储的高性能设计措施 (十一)

    asyncPutMessage方法真正的用来存储消息。 1.asyncPutMessage存储普通消息 DefaultMessageStore#asyncPutMessage() checkStoreStatus, checkMessage, checkLmqMessage校验。 CommitLog#asyncPutMessage存储消息, 更新耗时时间和失败次数。 1.1 checkStoreStatus检查存储状态 如果DefaultMessageStore是shutdown状态, 返回SERVICE_NOT_

    2024年02月13日
    浏览(35)
  • RocketMQ 存储优化技术 解析——图解、源码级解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年1月13日 🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只

    2024年02月19日
    浏览(29)
  • 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

    痛点背景 业务场景 假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做? 之前方案 最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)30分钟,则关闭订单。 方案评估 优点:是实

    2024年02月13日
    浏览(38)
  • 【RocketMQ】RocketMQ标签、过滤及消息重复消费

    参考文档: 官方文档 Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。 注: Topic:消息主题,通过 Topic 对不同的业务消息进行分类。 Tag:消息标签,用来进一步区分某个 Topic 下

    2024年02月07日
    浏览(32)
  • 解析 RocketMQ 业务消息 - “顺序消息”

    Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案

    2024年01月16日
    浏览(29)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(34)
  • RocketMQ 发送批量消息、过滤消息和事务消息

    前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。 消息的生产者 消息的消费者 消息分割 如果

    2023年04月21日
    浏览(35)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包