消费原理概览
文章来源:https://www.toymoban.com/news/detail-618867.html
先简单说下常见的rocketMq的部署方式,上图中broker为真正计算和存储消息的地方,而nameServer负责维护broker地
图中右侧consume message部分即是本文重点描述的部分,主要分为ConsumerGroup和Consumer,consumerGroup可以参考https://rocketmq.apache.org/docs/domainModel/07consumergroup/。简单的说,Consumer即是一个运行的应用,ComsumerGroup即为多个运行的应用组,而其中一个Consumer是如何启动并接受消息进行消费的呢?
以常见的java应用搭配spring为例,通常来说是在应用启动并实例化rocketmq sdk的相关类之后,调用相关类的初始化方法,获取nameServer地址和broker地址,启动netty客户端,并通过netty客户端向broker拉取消息,然后提交到消息消费服务中进行批量消费。
源码解析
引言
https://rocketmq.apache.org/docs/quickStart/01quickstart
上面是一个普通消息消费的demo,可以看到启动mq的消费代码主要分为设置cid、topic、tag和messageLister。
核心启动方法类 DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
这里会现有个switch方法判断当前mq客户端状态然后执行不同的启动逻辑。主要是未启动的话,执行启动客户端的逻辑;若已启动或启动失败的话,则抛出异常。然后是更新topic信息、发送心跳、负载均衡等。展开switch代码块可以看到如下代码,同时代码里serviceState默认为CREATE_JUST,所以在初次启动时会进入CREATE_JUST的流程。
public synchronized void start() throws MQClientException { switch (this.serviceState) { // 初始化流程 case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; // 校验设置consumerGroup的配置 this.checkConfig(); // 解析订阅的topic和tag等数据,构建订阅关系模型放到rebalanceImpl里 this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } // 初始化MQClientInstance、rebalaceImpl等 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); if (this.pullAPIWrapper == null) { this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); } this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); // 初始化消费进度 // 集群模式的话,消费进度保存在服务端broker;广播模式,保存在客户端consumer上 if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); // 根据是否顺序or并发消费,来创建不同的消费线程服务 // 本次主要关注并发消费, if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); //POPTODO reuse Executor ? this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); //POPTODO reuse Executor ? this.consumeMessagePopService = new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); // POPTODO this.consumeMessagePopService.start(); // 向mQClientFactory注册当前DefaultMQPushConsumerImpl // mQClientFactory里维护了一个consumerGroup和DefaultMQPushConsumerImpl的映射表 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 核心启动方法 // 这块是重点的启动流程,稍后展开说明 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
MQClientInstance#start
可以看到org.apache.rocketmq.client.impl.factory.MQClientInstance#start方法就是启动consumer的主要流程了
主要分了如下几步,下面会将下面的几步逐个展开说明:
- 拿到nameServer地址list
- 开启客户端和mq服务端的连接通道
- 启动定时任务(定时调用拿nameServer地址list)
- 启动拉取消息的服务
- 启动负载均衡服务
- 启动消息producer的推送服务
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server // 这里拿到nameServer地址list if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel // 开启客户端和mq服务端的连接通道 this.mQClientAPIImpl.start(); // Start various schedule tasks // 启动定时任务 // 主要是定时2分钟调用上面fetchNameServerAddr方法,拉取最新的nameServerAddr this.startScheduledTask(); // Start pull service // 启动拉取消息的服务 this.pullMessageService.start(); // Start rebalance service // 启动负载均衡服务 this.rebalanceService.start(); // Start push service // 启动消息producer的推送服务,这块不展开了 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); // 以上操作都成功的话,将服务置为running this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
1. 获取nameServerAddr
首先进入到org.apache.rocketmq.client.impl.MQClientAPIImpl#fetchNameServerAddr可以看下面方法,可以看到主要就是调用topAddressing.fetchNSAddr方法
public String fetchNameServerAddr() { try { // 主要是执行该方法 String addrs = this.topAddressing.fetchNSAddr(); if (!UtilAll.isBlank(addrs)) { if (!addrs.equals(this.nameSrvAddr)) { log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs); this.updateNameServerAddressList(addrs); this.nameSrvAddr = addrs; return nameSrvAddr; } } } catch (Exception e) { log.error("fetchNameServerAddr Exception", e); } return nameSrvAddr; }
然后点到topAddressing.fetchNSAddr里,这块提供的spi拓展获取nameServerAddr,否则的话,走默认方法fetchNSAddr()获取nameServerAddr。
public final String fetchNSAddr() { // 看了下这块应该是提供了spi拓展 // 使用方可以通过spi的方式显示实现TopAddressing.fetchNSAddr方法获取nameServerAddr if (!topAddressingList.isEmpty()) { for (TopAddressing topAddressing : topAddressingList) { String nsAddress = topAddressing.fetchNSAddr(); if (!Strings.isNullOrEmpty(nsAddress)) { return nsAddress; } } } // Return result of default implementation // 没有的话,走默认实现 return fetchNSAddr(true, 3000); }
下面是默认的获取nameServerAddr的方法,主要是通过拿到wsAddr作为url,拼接para参数,发http请求获取nameServerAddr。
public final String fetchNSAddr(boolean verbose, long timeoutMills) { // 拿到wsAddr作为url,拼接para参数 String url = this.wsAddr; try { if (null != para && para.size() > 0) { if (!UtilAll.isBlank(this.unitName)) { url = url + "-" + this.unitName + "?nofix=1&"; } else { url = url + "?"; } for (Map.Entry<String, String> entry : this.para.entrySet()) { url += entry.getKey() + "=" + entry.getValue() + "&"; } url = url.substring(0, url.length() - 1); } else { if (!UtilAll.isBlank(this.unitName)) { url = url + "-" + this.unitName + "?nofix=1"; } } // 发送http请求,拿到地址 HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills); if (200 == result.code) { String responseStr = result.content; if (responseStr != null) { return clearNewLine(responseStr); } else { LOGGER.error("fetch nameserver address is null"); } } else { LOGGER.error("fetch nameserver address failed. statusCode=" + result.code); } } catch (IOException e) { if (verbose) { LOGGER.error("fetch name server address exception", e); } } if (verbose) { String errorMsg = "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts"; errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); LOGGER.warn(errorMsg); } return null; } // 默认的wsAddr获取方式,默认值为 http://jmenv.tbsite.net:8080/rocketmq/nsaddr public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup; if (wsDomainName.indexOf(":") > 0) { wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup; } return wsAddr; }
在本地跑起来环境的话,可以看到返回值是一个ip:port的列表
2. 开启客户端和mq服务端的连接通道
org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
这块主要分为三个部分:
- 启动netty客户端
- 延迟3s扫描连接响应表
- 扫描availableNamesrvAddrMap
public void start() { // 这里标准的编写netty客户端代码 // 主要是添加了NettyClientHandler // 但是此时bootstarp还没有绑定远程服务器host和端口 if (this.defaultEventExecutorGroup == null) { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactoryImpl("NettyClientWorkerThread_")); } Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); LOGGER.info("Prepend SSL handler"); } else { LOGGER.warn("Connections are insecure as SSLContext is null!"); } } ch.pipeline().addLast( nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); if (nettyClientConfig.getClientSocketSndBufSize() > 0) { LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize()); handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()); } if (nettyClientConfig.getClientSocketRcvBufSize() > 0) { LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize()); handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); } if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) { LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}", nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()); handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark())); } if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) { handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } TimerTask timerTaskScanResponseTable = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { LOGGER.error("scanResponseTable exception", e); } finally { timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerTaskScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS); int connectTimeoutMillis = this.nettyClientConfig.getConnectTimeoutMillis(); TimerTask timerTaskScanAvailableNameSrv = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingClient.this.scanAvailableNameSrv(); } catch (Exception e) { LOGGER.error("scanAvailableNameSrv exception", e); } finally { timer.newTimeout(this, connectTimeoutMillis, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0, TimeUnit.MILLISECONDS); }
3. 启动定时任务
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
主要做启动一些客户端的定时操作,这里不展开说明
private void startScheduledTask() { // 还是调用上面的获取nameServer的方法,定时取nameServer地址列表 if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } // 定时更新topic路由信息 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); // 定时发送信息 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); // 定时持久化消费位点 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 定时调整线程池 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } }, 1, 1, TimeUnit.MINUTES); }
4. 启动拉取消息的服务
这一部分主要是mq客户端向broker拉取消息,然后解码、过滤后分批交给消费服务进行消费
this.pullMessageService.start()会走到如下方法,org.apache.rocketmq.common.ServiceThread#start
public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } // 设置stop标记我false,同时新起一个线程执行当前类的run方法 stopped = false; this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); }
pullMessageService实现了Runnable接口,会单独起一个线程并调用如下run方法
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Override public void run() { logger.info(this.getServiceName() + " service started"); // 上文设置了stopped为false,会到while里 while (!this.isStopped()) { try { // 从messageRequestQueue里取一个MessageRequest执行。若队列为空则堵塞 MessageRequest messageRequest = this.messageRequestQueue.take(); if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) { this.popMessage((PopRequest) messageRequest); } else { this.pullMessage((PullRequest) messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { logger.error("Pull Message Service Run Method exception", e); } } logger.info(this.getServiceName() + " service end"); }
以pullMessage方式为例,进入如下方法
org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) { // 根据PullRequest的消费组名从map中获取一个MQConsumerInner final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { // consumer不为空执行拉消息方法 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { logger.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
下面展开说明DefaultMQPushConsumerImpl#pullMessage方法
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest) { // 获取处理队列 final ProcessQueue processQueue = pullRequest.getProcessQueue(); // droped为true直接返回 if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } // 设置最近的拉取时间戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); // 如果状态不是running的话,则将pullRequest延迟3秒放进messageRequestQueue,并返回 // executePullRequestLater方法操作即是延迟pullTimeDelayMillsWhenException时间将pullRequest放进messageRequestQueue尾部 // executePullRequestLater方法通过上述操作操作,来打到随后执行pullRequest目的 try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } // pause为true的话,延迟1s将pullRequest放进messageRequestQueue,并返回 if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } // 下面这部分为消息拉取的流控代码 long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 当前处理队列消息超过1000时,延迟50ms执行该pullRequest;该操作超过1000次,打印日志 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 当前处理队列消息占用空间超过100mb时,延迟50ms后执行该pullRequest if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 非顺序消息的话 if (!this.consumeOrderly) { // 如果处理队列里的的最大消息和最小消息的queueSize差值大于2000的话,流控延后处理 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } // 下面为顺序消息,本次先不分析 } else { if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); if (offset < 0) { throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); } } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); return; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } // 拿到该消息topic的订阅信息,topic tag 表达式等数据 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); // 下面是拉取消息的回调函数,等下展开说明 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL); } else { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } } }; boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } // 拿到订阅表达式 类过滤标识等 String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } // 构建标记 int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { // 执行真正拉取消息的方法,并传输回调函数pullCallBack this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), this.defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
4.1. 真正拉取消息
这一部分主要是通过netty客户端向broker拉取消息
如下方法
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, java.lang.String, long, long, int, int, int, long, long, long,
org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback) public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int maxSizeInBytes, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 根据messageQueue拿到broker信息,包括broker地址、是否slave、版本等 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); // 拿不到的话,从nameServer更新topic信息后,重新拿 if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 组装请求头 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setMaxMsgBytes(maxSizeInBytes); requestHeader.setExpressionType(expressionType); requestHeader.setBname(mq.getBrokerName()); // 拿到broker地址 String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } // 通过mq客户端拉取消息 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request; // 这个先不纠结,根据不同的code组装不同的RemotingCommand if (PullSysFlag.hasLitePullFlag(requestHeader.getSysFlag())) { request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader); } else { request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); } // 上文的communicationMode字段传入的是ASYNC,所以这里走的ASYNC模式 switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }
进入org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
可以看到remotingClient为netty客户端,通过netty想broker获取消息,
private void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { // 通过netty客户端想broker获取消息,具体不在赘述 // 这里传入InvokeCallback,在拉消息完成后执行InvokeCallback this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override // 拿到消息后 public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { // 处理response解析相关请求头和body放到pullResult里 PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; // 拉取成功的话执行success回调函数,处理pullResult pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }
4.2. 执行success回调函数
这一部分主要是在拉取消息成功后,执行回调函数解码、过滤消息列表,然后提交消费服务分批消费
重点看拉取状态为FOUND 的情况,
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { // 这里解析pullResult,将pullResult的messageBinary(byte数组)解码为MessageExt列表 // 再根据subscriptionData里的tag classFilter等过滤MessageExt列表 // 将过滤后的msgListFilterAgain列表塞道pullResult pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); // 设置下一次偏移量 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 计算拉取rt long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); // 如果本次拉取消息量为空,立即将pullRequest返回队列里,等后续继续拉 long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { // 拉到消息的情况如下 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); // 计算拉取的TPS DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); // 把消息放到处理队列里 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 向消费服务提交消费请求 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL); } else { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } } };
4.2.1. 向消费服务提交消费请求
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { // 拿到批量消费size,默认为1 final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 小于等于1的话则直接消费 if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 这里开始批量消费,每批大小为consumeBatchSize for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { // 分批提交到线程池里消费 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
4.2.2. 执行ConsumeRequest的run方法
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run @Override public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { // msgs不为空的话,遍历设置消费开始时间戳 if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 批量消费msgs status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; } // 下面这些设置消费状态和返回类型 long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 有钩子函数的话,执行钩子函数 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel()); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } // 计算消费rt ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
文章来源地址https://www.toymoban.com/news/detail-618867.html
到了这里,关于RocketMq消费原理及源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!