概述
RocketMQ 4.9.1 版本针对 Broker 做了一些性能优化,这一批 PR 都挂载 ISSUE#2883 下。
和4.9.0版本相比,小消息实时生产的 TPS 提升了约 28%。
I have some commit to Improve produce performance in M/S mode:
- Change log level to debug: “Half offset {} has been committed/rolled back”
- Optimise lock in WaitNotifyObject
- Remove lock in HAService
- Remove lock in GroupCommitService
- Eliminate array copy in HA
- Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
- Change default value of some parameters: sendMessageThreadPoolNums/useReentrantLockWhenPutMessage/flushCommitLogTimed/endTransactionThreadPoolNums
- Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
- extract generation of msgId out of lock in CommitLog (now only for single message processor)
- extract generation of topicQueueTable key out of sync code
- extract generation of msgId out of lock in CommitLog (for batch)
- fix ipv6 problem introduced in commit “Optimise performance of asyncPutMessage (extract some code out of putMessage lock)”
- Remove an duplicate MessageDecoder.string2messageProperties for each message, and prevent store “WAIT=true” property (in most case) to save 9 bytes for each message.
- Improve performance of string2messageProperties/messageProperties2String, and save 1 byte for each message.
- Optimise parse performance for SendMessageRequestHeaderV2
下面会从源码层面来详细分析一下优化点和优化的原因。了解这些优化需要对 RocketMQ 源码比较熟悉,为了方便理解,会在讲解优化点前补充一些前置知识。
优化分析
事务消息日志优化(1)
- Change log level to debug: “Half offset {} has been committed/rolled back”
默认的配置下每条消息都会打出一条日志,改动主要移除了事务消息中的日志打印。
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) {
log.~~info~~("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
}
这个优化比较简单,从这当中可以学到的是在打印日志时需要谨慎,尤其是对于 RocketMQ 这种高性能中间件来说,日志的打印可能会占用较多 CPU 资源。
此外,如果日志中涉及字符串拼接等操作,消耗会更大,应当避免。
主从复制和同步刷流程中锁的优化/移除(2-4)
Improve produce performance in M/S mode
- Optimise lock in WaitNotifyObject
- Remove lock in HAService
- Remove lock in GroupCommitService
在分析如何优化之前需要学习一些前置指示,看一下 RocketMQ 中主从复制和同步刷盘的原理。这两个操作原理基本相同。
前置知识:主从复制和同步刷盘中的生产消费模式
在 RocketMQ 内部,主从复制和同步刷盘都是多线程协作处理的。以主从复制为例(GroupTransferService
),消息处理线程(多个)不断接收消息,产生待复制的消息,另外有一个 ServiceThread
单线程处理复制结果,可以把前者看做数据生产者,后者看做数据消费者,RocketMQ 使用了双 Buffer 来达到批量处理的目的。
如下图,消费者正在处理数据的同时,生产者可以不受影响的继续添加数据,第一阶段生产者 Buffer 有 3 条数据,消费者 Buffer 有 2 条数据,由于消费者是单线程,没有别的线程跟它竞争,所以它可以批量处理这 2 条数据,完成后它会交换这两个 Buffer 的引用,于是接下来的第二阶段它又可以批量处理 3 条数据。
优化1:主从复制和同步刷盘中重量级锁synchronized改为自旋锁
之前 RocketMQ 在生产者写入 putRequest()
、交换 Buffer 引用 swapRequests()
、以及内部处理中都使用了重量级锁synchronized
保证线程安全。
实际 putRequest()
方法中只做了添加数据到列表的操作;swapRequests()
中做了交换操作,耗时都较小,故可以换成自旋锁。每次加解锁都只有 2 次 CAS 操作的开销,而不发生线程切换。
优化2:WaitNotifyObject 类
WaitNotifyObject
被用于做线程之间的异步通知。在主从复制逻辑中被用到。用法类似 synchronized
的 wait()
和 nofityAll()
,等待-通知机制。
主从复制线程循环传输数据,如果没有数据则调用 WaitNotifyObject#allWaitForRunning()
方法等待。
在CommitLog保存消息之后,调用 WaitNotifyObject#wakeUpAll()
方法唤醒主从复制线程。
本次优化减少了需要进入同步代码块的次数。
修改点:waitingThreadTable
改为 ConcurrentHashMap
,然后可以将 waitingThreadTable
移出同步代码块。
volatile boolean hasNotified
改为 AtomicBoolean hasNotified
消除主从复制中不必要的数组拷贝(5)
- Eliminate array copy in HA
了解这个优化之前需要先学习一下前置知识,包括 RocketMQ 中 CommitLog 使用的内存映射文件,和主从复制的流程。
内存映射文件mmap
RocketMQ 的 CommitLog 是内存映射文件(mmap)。下面这张图对比了普通 IO 和内存映射 IO 之间的区别。
mmap 将文件直接映射到用户内存,使得对文件的操作不用再需要拷贝到PageCache,而是转化为对映射地址映射的PageCache的操作,使随机读写文件和读写内存拥有相似的速度(随机地址被映射到了内存)
主从复制流程概要
RocketMQ 主从复制机制会在消息写入 CommitLog 之后,Master Broker 将消息发送到 Slave,达到消息不丢失。
本次修改点是在主从复制的 Slave 处理过程当中。HAClient 是 Slave 连接 Master 的实现类。
HAClient#run()
方法做了以下这些事:
- salve连接到master,向master上报slave当前的offset
- master收到后确认给slave发送数据的开始位置
- master查询开始位置对应的MappedFIle
- master将查找到的数据发送给slave
- slave收到数据后保存到自己的CommitLog
其中4、5步,Slave 接收到的数据存在一个 ByteBuffer 里面,把它保存到 CommitLog 的时候,原来的代码会新建一个字节数组,然后把读到的 ByteBuffer 里的数据拷贝进去。
优化:减少字节数组拷贝
原先在主从复制逻辑中的数组拷贝步骤其实是可以省略的,可以直接把从 Master 读到的 ByteBuffer 传到写 CommitLog 的方法中,并且一并传入数据的开始位置和长度,这样就可以在不重新复制字节数组的情况下传递 ByteBuffer 中的数据。
移除 CommitLog 中包含重复代码的 putMessage/putMessages 方法(6)
- Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
该优化主要是减少冗余代码
原本 CommitLog 中有如下这些保存消息的方法
- putMessage:同步保存单条消息
- asyncPutMessage:异步保存单条消息
- putMessages:同步保存批量消息
- asyncPutMessages:异步保存批量消息
其实同步保存和异步保存消息的逻辑差不多,但是原本并没有复用代码,而是每个方法都单独实现。这就导致同步和异步方法存在大量重复代码。
这个 Patch 合并了 putMessage & asyncPutMessage 、putMessages & asyncPutMessages 方法,在同步方法中调用异步方法的等待方法,删除了大量重复代码。
调整消息发送几个参数的默认值(7)
- Change default value of some parameters: sendMessageThreadPoolNums/useReentrantLockWhenPutMessage/flushCommitLogTimed/endTransactionThreadPoolNums
消息保存/发送参数优化
RocketMQ在保存消息时,由于要保证消息保存到 CommitLog 中是顺序的,写 CommitLog 只能单线程操作,写之前要先获取一个锁,这个锁也就是影响 RocketMQ 性能最关键的一个锁。
最早之前 3.2.X 版本这个锁是 synchronized,从 RocketMQ4.X 开始引入了自旋锁并作为默认值,同时将参数 sendMessageThreadPoolNums
(处理Client端发送消息线程池的线程数)改为了 1,这样处理每条消息写 CommitLog 的时候是一个线程在写,可以省下进出重量锁的开销。
不过这个地方单线程处理,任务有点重,处理消息的逻辑并不是往 CommitLog 里面一写(无法并行)就完事的,还有一些 CPU 开销比较大的工作,多线程处理比较好,经过一些实践测试,4 个线程是比较合理的数值,因此这个参数默认值改为 MIN(逻辑处理器数, 4)
。
既然有 4 个线程,还用自旋锁可能就不合适了,因为拿不到锁的线程会让 CPU 白白空转。所以改用可重入锁,useReentrantLockWhenPutMessage
参数还是改为 true 比较好。
事务消息二阶段处理线程大小
endTransactionThreadPoolNums
是事务消息二阶段处理线程大小,sendMessageThreadPoolNums
则指定一阶段处理线程池大小。如果二阶段的处理速度跟不上一阶段,就会造成二阶段消息丢失导致大量回查,所以建议 endTransactionThreadPoolNums
应该大于 sendMessageThreadPoolNums
,建议至少 4 倍。
开启定时刷盘
flushCommitLogTimed
参数表示是否定时刷盘,之前默认为 false,表示实时刷盘。
本次对刷盘相关的参数也进行了调整。默认情况下,RocketMQ 是异步刷盘,但每次处理消息都会触发一个异步的刷盘请求。这次将 flushCommitLogTimed
这个参数改成 true,也就是定时刷盘(默认每 500ms),可以大幅降低对 IO 压力,在主从同步复制的场景下,可靠性也不会降低。
优化 putMessage 锁内操作(8-12)
Improve produce performance in M/S mode.
- Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
- extract generation of msgId out of lock in CommitLog (now only for single message processor)
- extract generation of topicQueueTable key out of sync code
- extract generation of msgId out of lock in CommitLog (for batch)
- fix ipv6 problem introduced in commit “Optimise performance of asyncPutMessage (extract some code out of putMessage lock)”
CommitLog 是 RocketMQ 消息存储文件。单个 Broker 上所有消息都顺序保存在 CommitLog 中。
写 CommitLog 只能单线程操作,写之前要先获取一个锁,这个锁也就是影响 RocketMQ 性能最关键的一个锁。
理论上这里只要往 MappedByteBuffer 写一下就好了,但实践往往要比理论复杂得多,因为各种原因,这个锁里面干的事情非常的多。
由于当前代码的复杂性,这个优化是本批次修改里面改动最大的,但它的逻辑其实很简单,就是把锁内干的事情,尽量的放到锁的外面去做,能先准备好的数据就先准备好。它包括了以下改动:
- 将 Buffer 的大部分准备工作(编码工作)放到了锁外,提前做好。
- 将 MessageId 的做成了懒初始化(放到锁外),这个消息 ID 的生成涉及很多编解码和数据复制工作,实际上性能开销相当大。
- 原来锁内用来查位点哈希表的 Key 是个拼接出来的字符串,这次也改到锁外先生成好。
- 顺便补上了之前遗漏的关于 IPv6 的处理。
- 删除了无用的代码。
优化 asyncPutMessage 性能,将准备工作放到锁外
先看一下代码上的改动,右边绿色新增的代码是原先在锁中的操作,现在都移动到了锁外面。
右边新增的的 putMessageThreadLocal.getEncode().encode(msg)
完成了大量预操作,将原先 CommitLog#DefaultAppendMessageCallback#doAppend()
方法中的操作移动到了锁外。
下面的代码第一份是修改前的,doAppend() 方法是锁内操作;第二份是修改后的,encode() 方法抽到了加锁之前。
// CommitLog.java 修改前
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// ...
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// ... Determines whether there is sufficient free space
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
// CommitLog.java 修改后
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// Initialization of storage space
this.resetByteBuffer(encoderBuffer, msgLen);
// 1 TOTALSIZE
this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.encoderBuffer.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.encoderBuffer.putInt(msgInner.getQueueId());
// 5 FLAG
this.encoderBuffer.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET, need update later
this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET, need update later
this.encoderBuffer.putLong(0);
// 8 SYSFLAG
this.encoderBuffer.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.encoderBuffer.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
// 11 STORETIMESTAMP
this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.encoderBuffer.putInt(bodyLength);
if (bodyLength > 0)
this.encoderBuffer.put(msgInner.getBody());
// 16 TOPIC
this.encoderBuffer.put((byte) topicLength);
this.encoderBuffer.put(topicData);
// 17 PROPERTIES
this.encoderBuffer.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.encoderBuffer.put(propertiesData);
encoderBuffer.flip();
return null;
}
然后把预编码的数据放到 MessageExtBrokerInner
中的 private ByteBuffer encodedBuff
字段,传到 doAppend()
方法中使用
MessageId 懒加载
使用函数式接口 Supplier
,将 MessageId 计算的逻辑放到 Supplier
中。创建结果对象时将 Supplier
传入,而不是直接计算 MessageId。
当结果的 getMsgId()
方法被调用,才会执行 Supplier
中 MessageId 的计算方法。
// CommitLog#DefaultAppendMessageCallback
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// ...
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
// ...
return result;
}
// AppendMessageResult.java
public String getMsgId() {
// msgId懒加载
if (msgId == null && msgIdSupplier != null) {
msgId = msgIdSupplier.get();
}
return msgId;
}
优化消息 Header 解析的性能(13-15)
去除字符串末尾占位符,节省消息传输大小
优化字符串格式的属性存储。RocketMQ 在消息传输时用字符串存储一个 Map,接受消息后再解析成Map。
字符串采用这种格式存储 Map:
key1\u0001value1\u0002key2\u0001value2\u0002
该 Patch 优化掉了字符串末尾的\u0002
,为每个消息节省了1字节传输大小。
优化 string 和 map 互相解析的性能
优化前后效果:
Benchmark Mode Cnt Score Error Units(10000 loop in each op)
TempTest.messageProperties2String thrpt 2 2257.276 ops/s
TempTest.messageProperties2String_old thrpt 2 1464.342 ops/s
TempTest.string2messageProperties thrpt 2 1590.499 ops/s
TempTest.string2messageProperties_old thrpt 2 605.118 ops/s
- string 转 map 优化
优化点主要是预先计算了需要解析成字符串的长度,然后为 StringBuilder
定义了初始长度。
StringBuilder 是一个可以动态增加自身数据长度的类,其默认长度(capacity属性)为16。它的底层结构实际是 char[]。
在 TPS 很高的场景下, StringBuilder 默认长度是 16,处理一个正常的消息,至少会内部扩展 2 次,白白产生 2 个对象和 2 次数组复制。
所以优化方案就是先算好需要的长度,创建 StringBuffer 的时候直接就指定好。
- map 转 string 优化
可以看到右边的代码使用了 indexOf
和 substring
方法替换原来的 split
方法
其实 split 方法内部也是使用 indexOf 和 substring 方法的,但它内部新建了一个 ArrayList 用来保存返回结果,在返回时将结果复制到 String[]。
右边方法将切分后的字符串直接存到 map 中,免去了存到 ArrayList 中的过程,减少了复制,也避免了 ArrayList 扩容的损耗。
优化 Broker 请求消息头解码性能(15)
- Optimise parse performance for SendMessageRequestHeaderV2
RocketMQ 的通信协议定义了各种指令,它们的 Header 各不相同,共用了一个通用的解析方法,基于反射来解析和设置消息 Header。
这个解析 Header 方法的效率很低,本次优化单独定义了解析发送消息请求头的方法,直接get Map 中的属性,提升效率。
发送消息的请求header会类似如:
{
"code":310,
"extFields":{
"f":"0",
"g":"1482158310125",
"d":"4",
"e":"0",
"b":"TopicTest",
"c":"TBW102",
"a":"please_rename_unique_group_name",
"j":"0",
"k":"false",
"h":"0",
"i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
},
"flag":0,
"language":"JAVA",
"opaque":206,
"version":79
}
public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNotNull
private String a; // producerGroup;
@CFNotNull
private String b; // topic;
@CFNotNull
private String c; // defaultTopic;
@CFNotNull
private Integer d; // defaultTopicQueueNums;
@CFNotNull
private Integer e; // queueId;
@CFNotNull
private Integer f; // sysFlag;
@CFNotNull
private Long g; // bornTimestamp;
@CFNotNull
private Integer h; // flag;
@CFNullable
private String i; // properties;
@CFNullable
private Integer j; // reconsumeTimes;
@CFNullable
private boolean k; // unitMode = false;
private Integer l; // consumeRetryTimes
@CFNullable
private boolean m; //batch
接收消息时,会将 Header 解码成 SendMessageRequestHeaderV2
类文章来源:https://www.toymoban.com/news/detail-803013.html
public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCustomHeader> classHeader)
throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
if (this.extFields != null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String fieldName = field.getName();
if (!fieldName.startsWith("this")) {
try {
String value = this.extFields.get(fieldName);
if (null == value) {
Annotation annotation = getNotNullAnnotation(field);
if (annotation != null) {
throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(StringCanonicalName)) {
valueParsed = value;
} else if (type.equals(IntegerCanonicalName1) || type.equals(IntegerCanonicalName2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LongCanonicalName1) || type.equals(LongCanonicalName2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BooleanCanonicalName1) || type.equals(BooleanCanonicalName2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DoubleCanonicalName1) || type.equals(DoubleCanonicalName2)) {
valueParsed = Double.parseDouble(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
field.set(objectHeader, valueParsed);
} catch (Throwable e) {
}
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(
RemotingCommand request)
throws RemotingCommandException {
SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
HashMap<String, String> fields = request.getExtFields();
if (fields == null) {
throw new RemotingCommandException("the ext fields is null");
}
String s = fields.get("a");
checkNotNull(s, "the custom field <a> is null");
r.setA(s);
s = fields.get("b");
checkNotNull(s, "the custom field <b> is null");
r.setB(s);
s = fields.get("c");
checkNotNull(s, "the custom field <c> is null");
r.setC(s);
s = fields.get("d");
checkNotNull(s, "the custom field <d> is null");
r.setD(Integer.parseInt(s));
s = fields.get("e");
checkNotNull(s, "the custom field <e> is null");
r.setE(Integer.parseInt(s));
s = fields.get("f");
checkNotNull(s, "the custom field <f> is null");
r.setF(Integer.parseInt(s));
s = fields.get("g");
checkNotNull(s, "the custom field <g> is null");
r.setG(Long.parseLong(s));
s = fields.get("h");
checkNotNull(s, "the custom field <h> is null");
r.setH(Integer.parseInt(s));
s = fields.get("i");
if (s != null) {
r.setI(s);
}
s = fields.get("j");
if (s != null) {
r.setJ(Integer.parseInt(s));
}
s = fields.get("k");
if (s != null) {
r.setK(Boolean.parseBoolean(s));
}
s = fields.get("l");
if (s != null) {
r.setL(Integer.parseInt(s));
}
s = fields.get("m");
if (s != null) {
r.setM(Boolean.parseBoolean(s));
}
return r;
}
左边其实是一个通用的解码方法,右边是针对消息生产的指令 SendMessageRequestHeaderV2
优化的解码方法。这里不再使用共同的这个解析器,而是简单粗暴的直接一个一个去 set 每一个属性,这样这个方法获得了大约 4 倍性能的提升。文章来源地址https://www.toymoban.com/news/detail-803013.html
参考资料
- Apache RocketMQ 4.9.1 高性能优化之路
- RocketMQ这样做,离物理极限性能还差多远?
- 寻找Java中String.split性能更好的方法
- RocketMQ——通信协议
到了这里,关于RocketMQ 4.9.1 性能优化 源码剖析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!