第九章-Broker-接收并处理生产者发送的消息

这篇具有很好参考价值的文章主要介绍了第九章-Broker-接收并处理生产者发送的消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

要找到生产者发送的消息被谁接收了,就得先确定两件事情:

1、发送消息使用的请求码?

答:从章节3.5.1中,可以得出该请求码是,RequestCode.SEND_MESSAGE。

2、Broker端针对发送消息使用的处理器?

答:从章节8.5中,可以得出处理器是 SendMessageProcessor,并且直接进到 processRequest 方法。

至此,两个必要条件已经确定,接下来就看源码怎么实现了。

public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK: // 这个是消费失败重推请求码,不在此次范围内
            return this.consumerSendMsgBack(ctx, request);
        default:// 既然是default,那指定是包括了请求码 RequestCode.SEND_MESSAGE,往下走就行
            // 解析请求命令,并转换成broker内部识别的发送消息请求对象,这个方法在 AbstractSendMessageProcessor 类中
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                // 请求都为 null 了,那指定返回 null
                return null;
            }
		   // 构建消息追踪对象
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // 发送消息前调用,其实这个hook就是给用户扩展的,类似拦截器,用户自定义一个类,并实现接口 SendMessageHook 就行
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

            RemotingCommand response;
            if (requestHeader.isBatch()) {
                // 批量消息的处理,具体看章节`9.2`
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 单个消息的处理,具体看章节`9.1`
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }

            // 发送消息后调用,hook就是给用户扩展的,类似拦截器,用户自定义一个类,并实现接口 SendMessageHook 就行
            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

AbstractSendMessageProcessor.parseRequestHeader文章来源地址https://www.toymoban.com/news/detail-852980.html

// 这是一个 protected 方法,实际上就是想着子类重写的方法
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
    throws RemotingCommandException {

    SendMessageRequestHeaderV2 requestHeaderV2 = null;
    SendMessageRequestHeader requestHeader = null;
    switch (request.getCode()) {
        case RequestCode.SEND_BATCH_MESSAGE:
        case RequestCode.SEND_MESSAGE_V2:
            requestHeaderV2 =
                (SendMessageRequestHeaderV2) request
                .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
        case RequestCode.SEND_MESSAGE: // 看到这里,是不是很惊讶,这不就是我们要的请求码么
            if (null == requestHeaderV2) {
                // 这一步就简单了,decodeCommandCustomHeader 方法内部就是反射构建 SendMessageRequestHeader 类对象,然后给它赋值,有兴趣的读者自行进去看。
                requestHeader =
                    (SendMessageRequestHeader) request
                    .decodeCommandCustomHeader(SendMessageRequestHeader.class);
            } else {
                requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
            }
        default:
            break;
    }
    return requestHeader;
}

9.1 发送单个消息处理

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    // 先创建响应命令对象
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    // 唯一标识当前请求的id
    response.setOpaque(request.getOpaque());

    // MSG_REGION 属性
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    // TRACE_ON 属性
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 记录开始接收请求的时间
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    // 当前时间不能小于接收请求的时间,肯定是接收在前,处理在后
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        // 这种情况一般就是当前 broker 所在服务器的时间没有校准,出现了问题
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    response.setCode(-1);
    // 消息校验
    super.msgCheck(ctx, requestHeader, response);
    // 校验失败,直接返回
    if (response.getCode() != -1) {
        return response;
    }
    // 获取消息体 body 的字节
    final byte[] body = request.getBody();
    // 要发送的消息队列 id
    int queueIdInt = requestHeader.getQueueId();
    // 从本地缓存 map 中取出 topic 的配置对象
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    // 队列id肯定不能小于0
    if (queueIdInt < 0) {
        // 如果队列id小于0,那就随机生成一个,并与写队列数取模
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // 再将 msg 转换成 broker 内部处理能识别的 MessageExtBrokerInner 对象
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic()); // 设置topic
    msgInner.setQueueId(queueIdInt); // 设置队列 id

    // 处理重试消息和死信队列消息,这里先略过,关于这块知识点,后面章节会专门抽出来讲
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }

    // 接下来就是正常的赋值操作
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 判断是否事务消息,事务消息不在讲解范围内,略过
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
            return response;
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        // 开始存放消息,看`第十章`
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }

    // 处理消息存放后结果,并最终返回至客户端,实现看`9.3`
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

9.2 发送批量消息处理

private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
                                         final RemotingCommand request,
                                         final SendMessageContext sendMessageContext,
                                         final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    // 先创建响应命令对象
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    // 唯一标识当前请求的id
    response.setOpaque(request.getOpaque());

    // MSG_REGION 属性
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    // TRACE_ON 属性
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("Receive SendMessage request command {}", request);
    // 记录开始接收请求的时间
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    // 当前时间不能小于接收请求的时间,肯定是接收在前,处理在后
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        // 这种情况一般就是当前 broker 所在服务器的时间没有校准,出现了问题
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    response.setCode(-1);
     // 校验失败,直接返回
    super.msgCheck(ctx, requestHeader, response);
    // 校验失败,直接返回
    if (response.getCode() != -1) {
        return response;
    }

    int queueIdInt = requestHeader.getQueueId();
    // 从本地缓存 map 中取出 topic 的配置对象
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    // 队列id肯定不能小于0
    if (queueIdInt < 0) {
        // 如果队列id小于0,那就随机生成一个,并与写队列数取模
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // topic 长度校验,不能大于 127
    if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
        response.setRemark("message topic length too long " + requestHeader.getTopic().length());
        return response;
    }

    // 如果是重推消息,则不支持批量
    if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
        response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
        return response;
    }
    // 再将批量消息转换成 broker 内部处理能识别的 MessageExtBatch 对象
    MessageExtBatch messageExtBatch = new MessageExtBatch();
    messageExtBatch.setTopic(requestHeader.getTopic());
    messageExtBatch.setQueueId(queueIdInt);

    int sysFlag = requestHeader.getSysFlag();
    if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
        sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
    }
    messageExtBatch.setSysFlag(sysFlag);

    messageExtBatch.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    messageExtBatch.setBody(request.getBody());
    messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
    messageExtBatch.setBornHost(ctx.channel().remoteAddress());
    messageExtBatch.setStoreHost(this.getStoreHost());
    messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

    // 上面都是对对象 MessageExtBatch 的赋值
    
     // 开始存放批量消息,看`第十章`
    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);

    // 处理消息存放后结果,并最终返回至客户端,实现看`9.3`
    return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
}

9.3 结果处理

private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                               RemotingCommand request, MessageExt msg,
                                               SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                               int queueIdInt) {
    if (putMessageResult == null) { // 判断存放数据结果,为null,证明出问题了,返回错误
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store putMessage return null");
        return response;
    }
    boolean sendOK = false;

    // switch 里是对存放结果与最终返回客户端结果的映射
    switch (putMessageResult.getPutMessageStatus()) {
            // Success
        case PUT_OK:
            sendOK = true;
            response.setCode(ResponseCode.SUCCESS);
            break;
        case FLUSH_DISK_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
            sendOK = true;
            break;
        case FLUSH_SLAVE_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
            sendOK = true;
            break;
        case SLAVE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            sendOK = true;
            break;

            // Failed
        case CREATE_MAPEDFILE_FAILED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("create mapped file failed, server is busy or broken.");
            break;
        case MESSAGE_ILLEGAL:
        case PROPERTIES_SIZE_EXCEEDED:
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark(
                "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
            break;
        case SERVICE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
            response.setRemark(
                "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
            break;
        case OS_PAGECACHE_BUSY:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
            break;
        case UNKNOWN_ERROR:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR");
            break;
        default:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR DEFAULT");
            break;
    }

    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    if (sendOK) { // 最终存放成功

        // 记录统计数据:增加该topic存放的消息数、增加该topic存放成功的次数
        this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        // 记录统计数据:增加该topic存放的消息大小
        this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                                                                      putMessageResult.getAppendMessageResult().getWroteBytes());
                // 记录统计数据:增加该broker存放的消息数
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());

        response.setRemark(null);

        // msgId赋值
        responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        // queueId赋值
        responseHeader.setQueueId(queueIdInt);
        // 消息队列偏移值赋值,以上这些信息具体什么意思,如何表现,都会在存储消息时讲明白,大家不要急
        responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());

        // 给客户端写回响应消息
        doResponse(ctx, request, response);

        // 消息发送 hook 处理,前面讲过,把这看成拦截器就行,用户自己实现,Rocketmq没有默认实现
        if (hasSendMessageHook()) {
            sendMessageContext.setMsgId(responseHeader.getMsgId());
            sendMessageContext.setQueueId(responseHeader.getQueueId());
            sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());

            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
            int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
        return null;
    } else {
        // 消息存放失败,也要处理消息发送 hook,前面讲过,把这看成拦截器就行,用户自己实现,Rocketmq没有默认实现
        if (hasSendMessageHook()) {
            int wroteSize = request.getBody().length;
            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
    }
    return response;
}

到了这里,关于第九章-Broker-接收并处理生产者发送的消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(40)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(48)
  • kafka生产者异步发送、同步发送、回调异步发送,是什么情况?

    Kafka是一种分布式流处理平台 ,它是一种高吞吐量、可扩展、可持久化的消息队列系统,用于处理和存储实时流式数据。 Kafka基于发布-订阅模式,采用了分布式、多副本、分区的架构。它允许生产者将数据以消息的形式发送到Kafka集群的一个或多个主题(topic)中,而消费者可以

    2024年02月15日
    浏览(39)
  • 多图详解 kafka 生产者消息发送过程

    生产者客户端代码 KafkaProducer 通过解析 producer.propeties 文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器、 RecordAccumulator消息累加器 、 元信息更新器 、启动发送请求的后台线程 生产者元信息更新器 我们之前有讲过. 客户端都会保存集群的元信息,例如

    2023年04月09日
    浏览(41)
  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(42)
  • Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验

    1.1 生产者消息发送流程 1.1.1 发送原理 在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。 batch.size:只有数据积累到bat

    2024年02月09日
    浏览(46)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(46)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(48)
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(78)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包