RocketMQ 4.9.1 性能优化 源码剖析

这篇具有很好参考价值的文章主要介绍了RocketMQ 4.9.1 性能优化 源码剖析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

RocketMQ 4.9.1 版本针对 Broker 做了一些性能优化,这一批 PR 都挂载 ISSUE#2883 下。

和4.9.0版本相比,小消息实时生产的 TPS 提升了约 28%。

I have some commit to Improve produce performance in M/S mode:

  1. Change log level to debug: “Half offset {} has been committed/rolled back”
  2. Optimise lock in WaitNotifyObject
  3. Remove lock in HAService
  4. Remove lock in GroupCommitService
  5. Eliminate array copy in HA
  6. Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
  7. Change default value of some parameters: sendMessageThreadPoolNums/useReentrantLockWhenPutMessage/flushCommitLogTimed/endTransactionThreadPoolNums
  8. Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
  9. extract generation of msgId out of lock in CommitLog (now only for single message processor)
  10. extract generation of topicQueueTable key out of sync code
  11. extract generation of msgId out of lock in CommitLog (for batch)
  12. fix ipv6 problem introduced in commit “Optimise performance of asyncPutMessage (extract some code out of putMessage lock)”
  13. Remove an duplicate MessageDecoder.string2messageProperties for each message, and prevent store “WAIT=true” property (in most case) to save 9 bytes for each message.
  14. Improve performance of string2messageProperties/messageProperties2String, and save 1 byte for each message.
  15. Optimise parse performance for SendMessageRequestHeaderV2

下面会从源码层面来详细分析一下优化点和优化的原因。了解这些优化需要对 RocketMQ 源码比较熟悉,为了方便理解,会在讲解优化点前补充一些前置知识。

优化分析

事务消息日志优化(1)

  1. Change log level to debug: “Half offset {} has been committed/rolled back”

默认的配置下每条消息都会打出一条日志,改动主要移除了事务消息中的日志打印。

while (true) {
    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
        break;
    }
    if (removeMap.containsKey(i)) {
        log.~~info~~("Half offset {} has been committed/rolled back", i);
        Long removedOpOffset = removeMap.remove(i);
        doneOpOffset.add(removedOpOffset);
}

这个优化比较简单,从这当中可以学到的是在打印日志时需要谨慎,尤其是对于 RocketMQ 这种高性能中间件来说,日志的打印可能会占用较多 CPU 资源。

此外,如果日志中涉及字符串拼接等操作,消耗会更大,应当避免。

主从复制和同步刷流程中锁的优化/移除(2-4)

Improve produce performance in M/S mode

  1. Optimise lock in WaitNotifyObject
  2. Remove lock in HAService
  3. Remove lock in GroupCommitService

在分析如何优化之前需要学习一些前置指示,看一下 RocketMQ 中主从复制和同步刷盘的原理。这两个操作原理基本相同。

前置知识:主从复制和同步刷盘中的生产消费模式

在 RocketMQ 内部,主从复制和同步刷盘都是多线程协作处理的。以主从复制为例(GroupTransferService),消息处理线程(多个)不断接收消息,产生待复制的消息,另外有一个 ServiceThread 单线程处理复制结果,可以把前者看做数据生产者,后者看做数据消费者,RocketMQ 使用了双 Buffer 来达到批量处理的目的。

如下图,消费者正在处理数据的同时,生产者可以不受影响的继续添加数据,第一阶段生产者 Buffer 有 3 条数据,消费者 Buffer 有 2 条数据,由于消费者是单线程,没有别的线程跟它竞争,所以它可以批量处理这 2 条数据,完成后它会交换这两个 Buffer 的引用,于是接下来的第二阶段它又可以批量处理 3 条数据。

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

优化1:主从复制和同步刷盘中重量级锁synchronized改为自旋锁

之前 RocketMQ 在生产者写入 putRequest() 、交换 Buffer 引用 swapRequests() 、以及内部处理中都使用了重量级锁synchronized保证线程安全。

实际 putRequest() 方法中只做了添加数据到列表的操作;swapRequests() 中做了交换操作,耗时都较小,故可以换成自旋锁。每次加解锁都只有 2 次 CAS 操作的开销,而不发生线程切换

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

优化2:WaitNotifyObject 类

WaitNotifyObject 被用于做线程之间的异步通知。在主从复制逻辑中被用到。用法类似 synchronizedwait()nofityAll(),等待-通知机制。

主从复制线程循环传输数据,如果没有数据则调用 WaitNotifyObject#allWaitForRunning() 方法等待。

在CommitLog保存消息之后,调用 WaitNotifyObject#wakeUpAll() 方法唤醒主从复制线程。

本次优化减少了需要进入同步代码块的次数。


修改点:waitingThreadTable 改为 ConcurrentHashMap,然后可以将 waitingThreadTable 移出同步代码块。

volatile boolean hasNotified 改为 AtomicBoolean hasNotified

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

消除主从复制中不必要的数组拷贝(5)

  1. Eliminate array copy in HA

了解这个优化之前需要先学习一下前置知识,包括 RocketMQ 中 CommitLog 使用的内存映射文件,和主从复制的流程。

内存映射文件mmap

RocketMQ 的 CommitLog 是内存映射文件(mmap)。下面这张图对比了普通 IO 和内存映射 IO 之间的区别。

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

mmap 将文件直接映射到用户内存,使得对文件的操作不用再需要拷贝到PageCache,而是转化为对映射地址映射的PageCache的操作,使随机读写文件和读写内存拥有相似的速度(随机地址被映射到了内存)

主从复制流程概要

RocketMQ 主从复制机制会在消息写入 CommitLog 之后,Master Broker 将消息发送到 Slave,达到消息不丢失。

本次修改点是在主从复制的 Slave 处理过程当中。HAClient 是 Slave 连接 Master 的实现类。

HAClient#run() 方法做了以下这些事:

  1. salve连接到master,向master上报slave当前的offset
  2. master收到后确认给slave发送数据的开始位置
  3. master查询开始位置对应的MappedFIle
  4. master将查找到的数据发送给slave
  5. slave收到数据后保存到自己的CommitLog

其中4、5步,Slave 接收到的数据存在一个 ByteBuffer 里面,把它保存到 CommitLog 的时候,原来的代码会新建一个字节数组,然后把读到的 ByteBuffer 里的数据拷贝进去。

优化:减少字节数组拷贝

原先在主从复制逻辑中的数组拷贝步骤其实是可以省略的,可以直接把从 Master 读到的 ByteBuffer 传到写 CommitLog 的方法中,并且一并传入数据的开始位置和长度,这样就可以在不重新复制字节数组的情况下传递 ByteBuffer 中的数据。

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

移除 CommitLog 中包含重复代码的 putMessage/putMessages 方法(6)

  1. Remove putMessage/putMessages method in CommitLog which has too many duplicated code.

该优化主要是减少冗余代码

原本 CommitLog 中有如下这些保存消息的方法

  • putMessage:同步保存单条消息
  • asyncPutMessage:异步保存单条消息
  • putMessages:同步保存批量消息
  • asyncPutMessages:异步保存批量消息

其实同步保存和异步保存消息的逻辑差不多,但是原本并没有复用代码,而是每个方法都单独实现。这就导致同步和异步方法存在大量重复代码。

这个 Patch 合并了 putMessage & asyncPutMessage 、putMessages & asyncPutMessages 方法,在同步方法中调用异步方法的等待方法,删除了大量重复代码。

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

调整消息发送几个参数的默认值(7)

  1. Change default value of some parameters: sendMessageThreadPoolNums/useReentrantLockWhenPutMessage/flushCommitLogTimed/endTransactionThreadPoolNums

消息保存/发送参数优化

RocketMQ在保存消息时,由于要保证消息保存到 CommitLog 中是顺序的,写 CommitLog 只能单线程操作,写之前要先获取一个锁,这个锁也就是影响 RocketMQ 性能最关键的一个锁。

最早之前 3.2.X 版本这个锁是 synchronized,从 RocketMQ4.X 开始引入了自旋锁并作为默认值,同时将参数 sendMessageThreadPoolNums(处理Client端发送消息线程池的线程数)改为了 1,这样处理每条消息写 CommitLog 的时候是一个线程在写,可以省下进出重量锁的开销。

不过这个地方单线程处理,任务有点重,处理消息的逻辑并不是往 CommitLog 里面一写(无法并行)就完事的,还有一些 CPU 开销比较大的工作,多线程处理比较好,经过一些实践测试,4 个线程是比较合理的数值,因此这个参数默认值改为 MIN(逻辑处理器数, 4)

既然有 4 个线程,还用自旋锁可能就不合适了,因为拿不到锁的线程会让 CPU 白白空转。所以改用可重入锁,useReentrantLockWhenPutMessage 参数还是改为 true 比较好。

事务消息二阶段处理线程大小

endTransactionThreadPoolNums 是事务消息二阶段处理线程大小,sendMessageThreadPoolNums 则指定一阶段处理线程池大小。如果二阶段的处理速度跟不上一阶段,就会造成二阶段消息丢失导致大量回查,所以建议 endTransactionThreadPoolNums 应该大于 sendMessageThreadPoolNums,建议至少 4 倍。

开启定时刷盘

flushCommitLogTimed 参数表示是否定时刷盘,之前默认为 false,表示实时刷盘。

本次对刷盘相关的参数也进行了调整。默认情况下,RocketMQ 是异步刷盘,但每次处理消息都会触发一个异步的刷盘请求。这次将 flushCommitLogTimed 这个参数改成 true,也就是定时刷盘(默认每 500ms),可以大幅降低对 IO 压力,在主从同步复制的场景下,可靠性也不会降低。

优化 putMessage 锁内操作(8-12)

Improve produce performance in M/S mode.

  1. Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
  2. extract generation of msgId out of lock in CommitLog (now only for single message processor)
  3. extract generation of topicQueueTable key out of sync code
  4. extract generation of msgId out of lock in CommitLog (for batch)
  5. fix ipv6 problem introduced in commit “Optimise performance of asyncPutMessage (extract some code out of putMessage lock)”

CommitLog 是 RocketMQ 消息存储文件。单个 Broker 上所有消息都顺序保存在 CommitLog 中。

写 CommitLog 只能单线程操作,写之前要先获取一个锁,这个锁也就是影响 RocketMQ 性能最关键的一个锁。

理论上这里只要往 MappedByteBuffer 写一下就好了,但实践往往要比理论复杂得多,因为各种原因,这个锁里面干的事情非常的多。

由于当前代码的复杂性,这个优化是本批次修改里面改动最大的,但它的逻辑其实很简单,就是把锁内干的事情,尽量的放到锁的外面去做,能先准备好的数据就先准备好。它包括了以下改动:

  1. 将 Buffer 的大部分准备工作(编码工作)放到了锁外,提前做好。
  2. 将 MessageId 的做成了懒初始化(放到锁外),这个消息 ID 的生成涉及很多编解码和数据复制工作,实际上性能开销相当大。
  3. 原来锁内用来查位点哈希表的 Key 是个拼接出来的字符串,这次也改到锁外先生成好。
  4. 顺便补上了之前遗漏的关于 IPv6 的处理。
  5. 删除了无用的代码。

优化 asyncPutMessage 性能,将准备工作放到锁外

先看一下代码上的改动,右边绿色新增的代码是原先在锁中的操作,现在都移动到了锁外面。

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

右边新增的的 putMessageThreadLocal.getEncode().encode(msg) 完成了大量预操作,将原先 CommitLog#DefaultAppendMessageCallback#doAppend() 方法中的操作移动到了锁外。

下面的代码第一份是修改前的,doAppend() 方法是锁内操作;第二份是修改后的,encode() 方法抽到了加锁之前。

// CommitLog.java 修改前
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
    // ...
    /**
     * Serialize message
     */
    final byte[] propertiesData =
        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
            + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    // ... Determines whether there is sufficient free space

		// Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);
// CommitLog.java 修改后
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
    /**
     * Serialize message
     */
    final byte[] propertiesData =
            msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }

    // Initialization of storage space
    this.resetByteBuffer(encoderBuffer, msgLen);
    // 1 TOTALSIZE
    this.encoderBuffer.putInt(msgLen);
    // 2 MAGICCODE
            this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.encoderBuffer.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.encoderBuffer.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.encoderBuffer.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET, need update later
            this.encoderBuffer.putLong(0);
            // 7 PHYSICALOFFSET, need update later
            this.encoderBuffer.putLong(0);
            // 8 SYSFLAG
            this.encoderBuffer.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.encoderBuffer.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
            // 11 STORETIMESTAMP
            this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
            // 13 RECONSUMETIMES
            this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.encoderBuffer.putInt(bodyLength);
            if (bodyLength > 0)
                this.encoderBuffer.put(msgInner.getBody());
            // 16 TOPIC
            this.encoderBuffer.put((byte) topicLength);
            this.encoderBuffer.put(topicData);
            // 17 PROPERTIES
            this.encoderBuffer.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.encoderBuffer.put(propertiesData);

            encoderBuffer.flip();
            return null;
        }

然后把预编码的数据放到 MessageExtBrokerInner 中的 private ByteBuffer encodedBuff 字段,传到 doAppend() 方法中使用

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

MessageId 懒加载

使用函数式接口 Supplier,将 MessageId 计算的逻辑放到 Supplier 中。创建结果对象时将 Supplier 传入,而不是直接计算 MessageId。

当结果的 getMsgId() 方法被调用,才会执行 Supplier 中 MessageId 的计算方法。

// CommitLog#DefaultAppendMessageCallback
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    long wroteOffset = fileFromOffset + byteBuffer.position();

    Supplier<String> msgIdSupplier = () -> {
        int sysflag = msgInner.getSysFlag();
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        return UtilAll.bytes2string(msgIdBuffer.array());
    };
    // ...
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    // ...
    return result;
}
// AppendMessageResult.java
public String getMsgId() {
    // msgId懒加载
    if (msgId == null && msgIdSupplier != null) {
        msgId = msgIdSupplier.get();
    }
    return msgId;
}

优化消息 Header 解析的性能(13-15)

去除字符串末尾占位符,节省消息传输大小

优化字符串格式的属性存储。RocketMQ 在消息传输时用字符串存储一个 Map,接受消息后再解析成Map。

字符串采用这种格式存储 Map:

key1\u0001value1\u0002key2\u0001value2\u0002

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

该 Patch 优化掉了字符串末尾的\u0002,为每个消息节省了1字节传输大小。


优化 string 和 map 互相解析的性能

优化前后效果:

Benchmark                               Mode  Cnt     Score   Error  Units(10000 loop in each op)
TempTest.messageProperties2String      thrpt    2  2257.276          ops/s
TempTest.messageProperties2String_old  thrpt    2  1464.342          ops/s
TempTest.string2messageProperties      thrpt    2  1590.499          ops/s
TempTest.string2messageProperties_old  thrpt    2   605.118          ops/s

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq


  • string 转 map 优化

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

优化点主要是预先计算了需要解析成字符串的长度,然后为 StringBuilder 定义了初始长度。

StringBuilder 是一个可以动态增加自身数据长度的类,其默认长度(capacity属性)为16。它的底层结构实际是 char[]。

在 TPS 很高的场景下, StringBuilder 默认长度是 16,处理一个正常的消息,至少会内部扩展 2 次,白白产生 2 个对象和 2 次数组复制。

所以优化方案就是先算好需要的长度,创建 StringBuffer 的时候直接就指定好。


  • map 转 string 优化

rocketmqmq 高版本性能下降,性能优化,zookeeper,分布式,rocketmq

可以看到右边的代码使用了 indexOfsubstring 方法替换原来的 split 方法

其实 split 方法内部也是使用 indexOf 和 substring 方法的,但它内部新建了一个 ArrayList 用来保存返回结果,在返回时将结果复制到 String[]。

右边方法将切分后的字符串直接存到 map 中,免去了存到 ArrayList 中的过程,减少了复制,也避免了 ArrayList 扩容的损耗。

优化 Broker 请求消息头解码性能(15)

  1. Optimise parse performance for SendMessageRequestHeaderV2

RocketMQ 的通信协议定义了各种指令,它们的 Header 各不相同,共用了一个通用的解析方法,基于反射来解析和设置消息 Header。

这个解析 Header 方法的效率很低,本次优化单独定义了解析发送消息请求头的方法,直接get Map 中的属性,提升效率。


发送消息的请求header会类似如:

{  
    "code":310,
    "extFields":{  
        "f":"0",
        "g":"1482158310125",
        "d":"4",
        "e":"0",
        "b":"TopicTest",
        "c":"TBW102",
        "a":"please_rename_unique_group_name",
        "j":"0",
        "k":"false",
        "h":"0",
        "i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
    },
    "flag":0,
    "language":"JAVA",
    "opaque":206,
    "version":79
}
public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
    @CFNotNull
    private String a; // producerGroup;
    @CFNotNull
    private String b; // topic;
    @CFNotNull
    private String c; // defaultTopic;
    @CFNotNull
    private Integer d; // defaultTopicQueueNums;
    @CFNotNull
    private Integer e; // queueId;
    @CFNotNull
    private Integer f; // sysFlag;
    @CFNotNull
    private Long g; // bornTimestamp;
    @CFNotNull
    private Integer h; // flag;
    @CFNullable
    private String i; // properties;
    @CFNullable
    private Integer j; // reconsumeTimes;
    @CFNullable
    private boolean k; // unitMode = false;
    private Integer l; // consumeRetryTimes
    @CFNullable
    private boolean m; //batch

接收消息时,会将 Header 解码成 SendMessageRequestHeaderV2

public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCustomHeader> classHeader) 
    throws RemotingCommandException {
    CommandCustomHeader objectHeader;
    try {
        objectHeader = classHeader.newInstance();
    } catch (InstantiationException e) {
        return null;
    } catch (IllegalAccessException e) {
        return null;
    }

    if (this.extFields != null) {
        Field[] fields = getClazzFields(classHeader);
        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String fieldName = field.getName();
                if (!fieldName.startsWith("this")) {
                    try {
                        String value = this.extFields.get(fieldName);
                        if (null == value) {
                            Annotation annotation = getNotNullAnnotation(field);
                            if (annotation != null) {
                                throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
                            }

                            continue;
                        }

                        field.setAccessible(true);
                        String type = getCanonicalName(field.getType());
                        Object valueParsed;

                        if (type.equals(StringCanonicalName)) {
                            valueParsed = value;
                        } else if (type.equals(IntegerCanonicalName1) || type.equals(IntegerCanonicalName2)) {
                            valueParsed = Integer.parseInt(value);
                        } else if (type.equals(LongCanonicalName1) || type.equals(LongCanonicalName2)) {
                            valueParsed = Long.parseLong(value);
                        } else if (type.equals(BooleanCanonicalName1) || type.equals(BooleanCanonicalName2)) {
                            valueParsed = Boolean.parseBoolean(value);
                        } else if (type.equals(DoubleCanonicalName1) || type.equals(DoubleCanonicalName2)) {
                            valueParsed = Double.parseDouble(value);
                        } else {
                            throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
                        }

                        field.set(objectHeader, valueParsed);

                    } catch (Throwable e) {
                    }
                }
            }
        }

        objectHeader.checkFields();
    }

    return objectHeader;
}
static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(
        RemotingCommand request)
            throws RemotingCommandException {
        SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
        HashMap<String, String> fields = request.getExtFields();
        if (fields == null) {
            throw new RemotingCommandException("the ext fields is null");
        }

        String s = fields.get("a");
        checkNotNull(s, "the custom field <a> is null");
        r.setA(s);

        s = fields.get("b");
        checkNotNull(s, "the custom field <b> is null");
        r.setB(s);

        s = fields.get("c");
        checkNotNull(s, "the custom field <c> is null");
        r.setC(s);

        s = fields.get("d");
        checkNotNull(s, "the custom field <d> is null");
        r.setD(Integer.parseInt(s));

        s = fields.get("e");
        checkNotNull(s, "the custom field <e> is null");
        r.setE(Integer.parseInt(s));

        s = fields.get("f");
        checkNotNull(s, "the custom field <f> is null");
        r.setF(Integer.parseInt(s));

        s = fields.get("g");
        checkNotNull(s, "the custom field <g> is null");
        r.setG(Long.parseLong(s));

        s = fields.get("h");
        checkNotNull(s, "the custom field <h> is null");
        r.setH(Integer.parseInt(s));

        s = fields.get("i");
        if (s != null) {
            r.setI(s);
        }

        s = fields.get("j");
        if (s != null) {
            r.setJ(Integer.parseInt(s));
        }

        s = fields.get("k");
        if (s != null) {
            r.setK(Boolean.parseBoolean(s));
        }

        s = fields.get("l");
        if (s != null) {
            r.setL(Integer.parseInt(s));
        }

        s = fields.get("m");
        if (s != null) {
            r.setM(Boolean.parseBoolean(s));
        }
        return r;
    }

左边其实是一个通用的解码方法,右边是针对消息生产的指令 SendMessageRequestHeaderV2 优化的解码方法。这里不再使用共同的这个解析器,而是简单粗暴的直接一个一个去 set 每一个属性,这样这个方法获得了大约 4 倍性能的提升。文章来源地址https://www.toymoban.com/news/detail-803013.html

参考资料

  • Apache RocketMQ 4.9.1 高性能优化之路
  • RocketMQ这样做,离物理极限性能还差多远?
  • 寻找Java中String.split性能更好的方法
  • RocketMQ——通信协议

到了这里,关于RocketMQ 4.9.1 性能优化 源码剖析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • GO 中高效 int 转换 string 的方法与高性能源码剖析

    Go 语言 中,将整数(int)转换为字符串(string)是一项常见的操作。 本文将从逐步介绍几种在 Go 中将 int 转换为 string 的常见方法,并重点剖析这几种方法在性能上的特点。另外,还会重点介绍 FormatInt 高效的算法实现。 最直接且常用的方法是使用 strconv 包中的 Itoa 函数。

    2024年01月21日
    浏览(43)
  • 【linux高性能服务器编程】项目实战——仿QQ聊天程序源码剖析

    hello !大家好呀! 欢迎大家来到我的Linux高性能服务器编程系列之项目实战——仿QQ聊天程序源码剖析,在这篇文章中, 你将会学习到如何利用Linux网络编程技术来实现一个简单的聊天程序,并且我会给出源码进行剖析,以及手绘UML图来帮助大家来理解,希望能让大家更能了

    2024年04月28日
    浏览(42)
  • 【Angular性能优化】项目8版本加载速度缓慢、白屏时间、首页渲染性能优化方案

    随着业务的代码一点点增加,加上Angular的项目本身就比 vue、react 的重一些,随之而来的启动速度,更改文件后编译速度,以及打包速度也会变慢,于是乎想着优化下我们的项目。 本文章主要说的是 : 打包Angular项目的一些配置,性能优化方面的方案 打包后,用户进入页面的

    2024年04月10日
    浏览(51)
  • 前端页面性能优化,性能测试算法优化,MeterSphere开源持续测试平台v2.10.5 LTS版本发布

    2023年8月7日,MeterSphere一站式开源持续测试平台正式发布v2.10.5 LTS版本。自2023年5月发布v2.10 LTS版本后,MeterSphere开源项目组坚持每两周发布小版本,持续进行问题的修复更新,并针对部分功能进行优化。 本次发布的MeterSphere v2.10.5 LTS版本在前端页面性能、性能测试算法、后端

    2024年02月14日
    浏览(41)
  • 前端进阶版本 ,性能优化—-防抖、节流、重绘与回流

    目录 【防抖】 【节流】 重绘(repaint) 回流(reflow):又叫重排(layout) 工作中要如何避免大量使用重绘与回流? 常见的会导致回流的元素 【防抖】 任务频繁触发的情况下,只有任务触发的间隔超过指定间隔的时候,任务才会执行。 【节流】 指定时间间隔内只会执行一次任务。

    2024年02月14日
    浏览(39)
  • 【QCustomPlot】性能提升之修改源码(版本 V2.x.x)

    使用 QCustomPlot 绘图库辅助开发时整理的学习笔记。同系列文章目录可见 《绘图库 QCustomPlot 学习笔记》目录。QCustomPlot 是开源项目,源码编写十分规范,想要理解它的可视化思路不算特别困难。我在这篇随笔中总结一下常用的源码修改技巧, 下面的每一个技巧都是独立的,相

    2024年02月06日
    浏览(28)
  • Android性能优化系列-腾讯matrix-IO监控-IOCanaryPlugin源码分析

    作者:秋去无痕 matrix 对io的监控包括四个方面 监控在主线程执行 IO 操作的问题 监控缓冲区过小的问题 监控重复读同一文件 监控内存泄漏问题 IOCanaryPlugin,内部由IOCanaryCore完成真正的操作。 根据配置进行hook的安装 取消hook 底层hook安装包函几个步骤,加载so,设置hook内容,

    2024年02月09日
    浏览(83)
  • 优化算法之梯度下降|Matlab实现梯度下降算法

    题目要求: 使用Matab实现梯度下降法 对于函数: min ⁡ f ( x ) = 2 x 1 2 + 4 x 2 2 − 6 x 1 − 2 x 1 x 2 min f(x)=2 x_{1}^{2}+4 x_{2}^{2}-6 x_{1}-2 x_{1} x_{2} min f ( x ) = 2 x 1 2 ​ + 4 x 2 2 ​ − 6 x 1 ​ − 2 x 1 ​ x 2 ​ 试采用 MATLAB实现最速下降法求解该问题, 给出具体的迭代过程、 最终优化结果、

    2024年02月16日
    浏览(50)
  • 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

    使用系统控制读取操作的DefaultMQPushConsumer可以自动调用传入的处理方法来处理收到的消息。通过设置各种参数和传入处理消息的函数,使用DefaultMQPushConsumer的主要目的是方便配置和处理消息。在收到消息后,系统会自动保存Offset,并且如果加入了新的DefaultMQPushConsumer,系统会

    2024年02月11日
    浏览(45)
  • 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析

    首先, DefaultMQPushConsumerImpl 是一个实现了 RocketMQ 的消费者客户端接口的类。该类的主要作用是从 RocketMQ 的 Broker 获取消息并进行消费。 主要可以通过pullMessage方法进行获取对应的操作,如下图所示。 在消费消息时, DefaultMQPushConsumerImpl 会将获取到的消息放入一个 processQueue

    2024年02月11日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包