要找到生产者发送的消息被谁接收了,就得先确定两件事情:
1、发送消息使用的请求码?
答:从章节3.5.1
中,可以得出该请求码是,RequestCode.SEND_MESSAGE。
2、Broker端针对发送消息使用的处理器?
答:从章节8.5
中,可以得出处理器是 SendMessageProcessor,并且直接进到 processRequest 方法。
至此,两个必要条件已经确定,接下来就看源码怎么实现了。文章来源:https://www.toymoban.com/news/detail-852980.html
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK: // 这个是消费失败重推请求码,不在此次范围内
return this.consumerSendMsgBack(ctx, request);
default:// 既然是default,那指定是包括了请求码 RequestCode.SEND_MESSAGE,往下走就行
// 解析请求命令,并转换成broker内部识别的发送消息请求对象,这个方法在 AbstractSendMessageProcessor 类中
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
// 请求都为 null 了,那指定返回 null
return null;
}
// 构建消息追踪对象
mqtraceContext = buildMsgContext(ctx, requestHeader);
// 发送消息前调用,其实这个hook就是给用户扩展的,类似拦截器,用户自定义一个类,并实现接口 SendMessageHook 就行
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
// 批量消息的处理,具体看章节`9.2`
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 单个消息的处理,具体看章节`9.1`
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
// 发送消息后调用,hook就是给用户扩展的,类似拦截器,用户自定义一个类,并实现接口 SendMessageHook 就行
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
AbstractSendMessageProcessor.parseRequestHeader文章来源地址https://www.toymoban.com/news/detail-852980.html
// 这是一个 protected 方法,实际上就是想着子类重写的方法
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
throws RemotingCommandException {
SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_MESSAGE: // 看到这里,是不是很惊讶,这不就是我们要的请求码么
if (null == requestHeaderV2) {
// 这一步就简单了,decodeCommandCustomHeader 方法内部就是反射构建 SendMessageRequestHeader 类对象,然后给它赋值,有兴趣的读者自行进去看。
requestHeader =
(SendMessageRequestHeader) request
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
}
default:
break;
}
return requestHeader;
}
9.1 发送单个消息处理
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 先创建响应命令对象
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
// 唯一标识当前请求的id
response.setOpaque(request.getOpaque());
// MSG_REGION 属性
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
// TRACE_ON 属性
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request);
// 记录开始接收请求的时间
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
// 当前时间不能小于接收请求的时间,肯定是接收在前,处理在后
if (this.brokerController.getMessageStore().now() < startTimstamp) {
// 这种情况一般就是当前 broker 所在服务器的时间没有校准,出现了问题
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
// 消息校验
super.msgCheck(ctx, requestHeader, response);
// 校验失败,直接返回
if (response.getCode() != -1) {
return response;
}
// 获取消息体 body 的字节
final byte[] body = request.getBody();
// 要发送的消息队列 id
int queueIdInt = requestHeader.getQueueId();
// 从本地缓存 map 中取出 topic 的配置对象
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
// 队列id肯定不能小于0
if (queueIdInt < 0) {
// 如果队列id小于0,那就随机生成一个,并与写队列数取模
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
// 再将 msg 转换成 broker 内部处理能识别的 MessageExtBrokerInner 对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic()); // 设置topic
msgInner.setQueueId(queueIdInt); // 设置队列 id
// 处理重试消息和死信队列消息,这里先略过,关于这块知识点,后面章节会专门抽出来讲
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
// 接下来就是正常的赋值操作
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 判断是否事务消息,事务消息不在讲解范围内,略过
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 开始存放消息,看`第十章`
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// 处理消息存放后结果,并最终返回至客户端,实现看`9.3`
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
9.2 发送批量消息处理
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 先创建响应命令对象
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
// 唯一标识当前请求的id
response.setOpaque(request.getOpaque());
// MSG_REGION 属性
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
// TRACE_ON 属性
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}", request);
// 记录开始接收请求的时间
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
// 当前时间不能小于接收请求的时间,肯定是接收在前,处理在后
if (this.brokerController.getMessageStore().now() < startTimstamp) {
// 这种情况一般就是当前 broker 所在服务器的时间没有校准,出现了问题
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
// 校验失败,直接返回
super.msgCheck(ctx, requestHeader, response);
// 校验失败,直接返回
if (response.getCode() != -1) {
return response;
}
int queueIdInt = requestHeader.getQueueId();
// 从本地缓存 map 中取出 topic 的配置对象
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
// 队列id肯定不能小于0
if (queueIdInt < 0) {
// 如果队列id小于0,那就随机生成一个,并与写队列数取模
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
// topic 长度校验,不能大于 127
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("message topic length too long " + requestHeader.getTopic().length());
return response;
}
// 如果是重推消息,则不支持批量
if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
return response;
}
// 再将批量消息转换成 broker 内部处理能识别的 MessageExtBatch 对象
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt);
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
messageExtBatch.setSysFlag(sysFlag);
messageExtBatch.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
// 上面都是对对象 MessageExtBatch 的赋值
// 开始存放批量消息,看`第十章`
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
// 处理消息存放后结果,并最终返回至客户端,实现看`9.3`
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
}
9.3 结果处理
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
if (putMessageResult == null) { // 判断存放数据结果,为null,证明出问题了,返回错误
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
}
boolean sendOK = false;
// switch 里是对存放结果与最终返回客户端结果的映射
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
sendOK = true;
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed, server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
"service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) { // 最终存放成功
// 记录统计数据:增加该topic存放的消息数、增加该topic存放成功的次数
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
// 记录统计数据:增加该topic存放的消息大小
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
// 记录统计数据:增加该broker存放的消息数
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
response.setRemark(null);
// msgId赋值
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
// queueId赋值
responseHeader.setQueueId(queueIdInt);
// 消息队列偏移值赋值,以上这些信息具体什么意思,如何表现,都会在存储消息时讲明白,大家不要急
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
// 给客户端写回响应消息
doResponse(ctx, request, response);
// 消息发送 hook 处理,前面讲过,把这看成拦截器就行,用户自己实现,Rocketmq没有默认实现
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
return null;
} else {
// 消息存放失败,也要处理消息发送 hook,前面讲过,把这看成拦截器就行,用户自己实现,Rocketmq没有默认实现
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
}
return response;
}
到了这里,关于第九章-Broker-接收并处理生产者发送的消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!