28.RocketMQ之消费者的负载均衡源码

这篇具有很好参考价值的文章主要介绍了28.RocketMQ之消费者的负载均衡源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


highlight: arduino-light

消费者负载均衡流程

当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是在同1个消费组中。此时一个消费组中多个Consumer消费一个Topic,而一个Topic对应多个MessageQueue。

比如TopicA有2个Consumer,6个MessageQueue,那么这6个MessageQueue怎么分配呢?

这就涉及到Consumer的负载均衡了。

首先 Consumer 在启动时,会把自己注册给所有 Broker并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。

然后Consumer在消费时,会随机连接一台Broker ,获取消费组中的所有Consumer。

然后根据要消费的Topic的messageQueue和消费者的数量做负载均衡。

因为是分布式环境下的负载均衡,所以如何让每个消费者都能保证看到的视图是一样呢?

答案是先对消费者和messageQueue排序,然后每个消费者使用相同的负载均衡策略做负载均衡。

这样每个消费者看到的负载均衡分配的messageQueue的视图就是一致的。

负载均衡主要流程如下:

假如topicA 有 6个队列。

消费者1启动订阅topicA。通过findConsumerIdList方法到broker获取到所有的消费者,发现是1个。

对消费者和队列进行排序,然后执行负载均衡策略,获取到当前消费者的分配到的mqSet是6个。

此时消费者2启动,获取所有的消费者,发现是2个。

对消费者和队列进行排序。执行负载均衡策略。获取到当前消费者分配到的mqSet是4 5 6。

此时消费者2就可以对4 5 6这3个队列做消息的拉取了。

过了20秒消费者1的负载均衡任务又触发了,流程同上,消费者1获取到当前消费者分配到的mqSet是1 2 3。

此时消费者1就可以对1 2 3 这3个队列做消息的拉取了。

说明:为了方便,有些地方的messageQueue使用mq代替。

```md /* 循环遍历所有的topic,获取对应的队列集合messageQueueSet。 判断是否是顺序消费,是否顺序消费判断的依据是当前消费者绑定的listener是并发还是顺序

在广播模式下 1.遍历的是所有的mqset

在集群模式下 1.遍历的是所有的allocateResult 即根据负载均衡策略分配的mqSet 2.移除新增的mq 3.移除长时间未拉取的mq 不会移除顺序消费且是集群模式的mq,也就是说不会参与负载均衡?? 4.遍历移除了不重要的(新增||长时间未拉取)的mq的mqSet 5.如果消费者是顺序消费,尝试加锁该MessageQueue(远程向服务器端请求加锁,并设置本地mq加锁状态),   加锁失败就跳过该mq 不会构建该mq的pullRequest。否则也会构建该mq的pullRequest。 6.清除本地该mq的消费offset,从服务器拉取更新本地mq的消费offset 7.构建pullRequest准备拉取消息

在 rebalance 时,需要对 队列,还有消费者客户端 ID 进行排序,以确保同一个消费组下的视图是一致的。 */ ```

ServiceThread#start

java public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; //this 指的是rebalanceService this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); }

RebalanceService#run

java @Override public void run() { log.info(this.getServiceName() + " service started"); // while (!this.isStopped()) { //waitInterval 20秒 也就是每20秒需要负载均衡一次 this.waitForRunning(waitInterval); //做负载均衡操作 this.mqClientFactory.doRebalance(); } ​ log.info(this.getServiceName() + " service end"); }

MQClientInstance#doRebalance

每个消费者都需要做负载均衡

public void doRebalance() { //MQClientInstance遍历已注册的消费者,对消费者执行doRebalance()方法 for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }

MQConsumerInner有2个实现,DefaultMQPullConsumerImpl和DefaultMQPushConsumerImpl。

分别是对应拉和推模式。默认DefaultMQPushConsumerImpl即推。

```java //DefaultMQPullConsumerImpl //pull this.rebalanceImpl.doRebalance(false);

//DefaultMQPushConsumerImpl   //push this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); ```

重点看推模式下的负载均衡。

DefaultMQPushConsumerImpl#doRebalance

假如topicA 有 6个队列。

消费者1启动订阅topicA。通过findConsumerIdList方法到broker获取到所有的消费者,发现是1个。

对消费者和队列进行排序,然后执行负载均衡策略,获取到当前消费者的分配到的mqSet是6个。

此时消费者2启动,获取所有的消费者,发现是2个。

对消费者和队列进行排序。执行负载均衡策略。获取到当前消费者分配到的mqSet是4 5 6。

此时消费者2就可以对4 5 6这3个队列做消息的拉取了。

过了20秒消费者1的负载均衡任务又触发了,流程同上,消费者1获取到当前消费者分配到的mqSet是1 2 3。

此时消费者1就可以对1 2 3 这3个队列做消息的拉取了。


参考链接:https://blog.csdn.net/HoneyYHQ9988/article/details/105941328

参考链接:https://blog.csdn.net/HoneyYHQ9988/article/details/105941328

```java /* consumeOrderly:是否是顺序消费,consumeOrderly由监听器的类型决定 如果监听器是MessageListenerOrderly consumeOrderly 是true 如果监听器是MessageListenerConcurrently consumeOrderly 是false if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService (this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService (this, (MessageListenerConcurrently) this.getMessageListenerInner()); } */ @Override public void doRebalance() { if (!this.pause) {

this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}

} ``` 注意doRebalance的参数consumeOrderly。

consumeOrderly:是否是顺序消费,consumeOrderly由监听器的类型决定。

如果监听器是MessageListenerOrderly consumeOrderly是true。

如果监听器是MessageListenerConcurrently consumeOrderly是false。

RebalanceImpl#doRebalance

```java //isOrder是否需要顺序消费消息 public void doRebalance(final boolean isOrder) { //遍历topic 对每个topic订阅的队列进行重新负载 // ConcurrentMap subscriptionInner //subscriptionInner的结构是1个Map,key是Topic,value是订阅信息 Map subTable = this.getSubscriptionInner(); if (subTable != null) { //注意在这里是for循环每个topic for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { //根据topic负载均衡 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY GROUPTOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } }

this.truncateMessageQueueNotMyTopic();

} ```

这里有个点:是for循环遍历topic,难道每个消费者组中的消费者还能订阅多个topic?

答案是肯定的!

有兴趣的同学可参考:https://blog.csdn.net/u011385940/article/details/130511718

接着往下看

RebalanceImpl#rebalanceByTopic

``` //topic 消息主题 //isOrder 是否顺序消费 private void rebalanceByTopic(final String topic, final boolean isOrder) {

switch (messageModel) {
        //广播模式每个消费者都需要消费所有的消息
        //所以在updateProcessQueueTableInRebalance中传入的是该topic下的所有的mqSet
        case BROADCASTING: {
            //获取每个topic的所有队列集合
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = 
                    this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                }
            } else {
                //不存在MessageQueue
            }
            break;
        }

        //集群模式
        case CLUSTERING: {
             //1.获取topic的所有队列集合
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            //2.从broker端获取topic的消费者集合,消费者集合种存放的是所有消费者的clientId
            //先根据topic获取topic所在的所有的broker的地址
            //随机选择1个broker地址拉取消费者列表
            //cidAll中的clientId长这样:172.18.120.141@21908
            List<String> cidAll = 
                this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        //不存在MessageQueue
                }
            }

            if (null == cidAll) {

            }
    //给消费者分配队列
            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                //将cid和mq做排序 保证每个消费者的视图一致
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                //默认是AllocateMessageQueueAveragely
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    //AllocateMessageQueueAveragely的allocate
                    //mqAll 队列集合
                    //cidAll 消费者集合
                    //根据队列集合和消费者集合进行重新负载均衡
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        //本机的clientId
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception}");
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
                //集群模式  
                //集群模式每个消费者组的一个实例对个一个
                //所以在updateProcessQueueTableInRebalance中传入的是allocateResultSet
                boolean changed = this.updateProcessQueueTableInRebalance
                                        (topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info("rebalanced result changed");
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

```

广播模式

广播模式首先获取mqset,mqSet是该topic下面的所有的messageQueue。

然后调用updateProcessQueueTableInRebalance方法,根据返回值判断是否调用messageQueueChanged方法。

集群模式

1.获取topic下的所有队列的集合记作mqSet。

2.根据topic获取topic所在的所有的broker的地址,然后随机选择1个broker地址拉取消费者列表,

3.消费者集合中存放的是所有消费者的clientId记作cidAll。cidAll中的clientId格式:172.18.120.141@21908

4.将cid和mq做排序 保证每个消费者的视图一致

5.获取负载均衡策略,默认是AllocateMessageQueueAveragely

6.调用负载均衡策略的allocate方法,需要传入mqSet、cidAll、消费者的clientId、消费者组,allocate方法返回结果记作allocateResult。

7.将分配给自己的mq集合allocateResult加入allocateResultSet。

然后调用updateProcessQueueTableInRebalance方法,根据返回值判断是否调用messageQueueChanged方法。

虽然广播和集群都调用了updateProcessQueueTableInRebalance方法但是传入的参数mqSet不同。

广播模式传入的是topic下面的所有的messageQueue。

集群模式传入的allocateResult是该消费者在该topic分配到的队列。

其实不管是集群还是广播,传入的参数mqSet都是分配给消费者的队列。这么理解也没毛病!

RebalanceImpl#updateProcessQueueTableInRebalance

```java //mqSet消费者分配到的消息队列的集合 //isOrder是否是顺序消息 private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,final boolean isOrder) { boolean changed = false;

//it中存放的是Entry<MessageQueue,ProcessQueue>
Iterator<Entry<MessageQueue,ProcessQueue>> it 
                                    = this.processQueueTable.entrySet().iterator();

//遍历Entry while (it.hasNext()) {

Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    if (mq.getTopic().equals(topic)) {
          //假如所有的是123456,当前消费者分配的是456
          //那么遍历123456 判断当前消费者分配的456是否包含123456
          //这样做的目的是防止原来分配的是123 现在分配的是456
          //如果不丢弃123 会造成123的重复消费
          //mqSet是分配给当前消费者的队列集合
          //如果这个队列没有分配给当前消费者 
          //那么需要丢弃该队列丢弃该ProcessQueue
          //那么在拉取消息的时候就不会对该ProcessQueue进行拉取
        if (!mqSet.contains(mq)) {
            //设置丢弃标识
            pq.setDropped(true);
            //广播模式直接返回true
            //如果是集群模式且顺序消费 返回false
            //把该mq的客户端消费offset更新到broker保存,移除客户端该mq的消费offset记录。
            //移除已经下线的mq
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                it.remove();
                changed = true;
                log.info("doRebalance, remove unnecessary mq");
            }
        //对于原来分配给当前消费者现在又分配给当前消费者的队列需要做失效判断
        //比如原来分配的是123 现在分配的是34 那么队列3就是符合这种条件的。
        //PULL_MAX_IDLE_TIME 默认是2分钟,参数rocketmq.client.pull.pullMaxIdleTime,12000秒
        //当前时间-拉取开始时间大于2分钟    
        //lastPullTimestamp + 2分钟 < 当前时间  认为该mq失效 需要丢弃
        } else if (pq.isPullExpired()) { 
            switch (this.consumeType()) {
                 //PULL
                case CONSUME_ACTIVELY:
                    break;
                //PUSH
                case CONSUME_PASSIVELY:
                    //push走这里
                    //把ProcessQueue置为失效
                    //这样在PullService线程拉取的时候该对象是失效状态,就不再拉取该对象
                    pq.setDropped(true);

                    //广播模式直接返回true
                    //如果是集群模式且顺序消费 返回false
                    //把该mq的客户端消费offset更新到broker保存
                    //移除客户端该mq的消费offset记录。
                    //从集合移除长时间没有拉取消息的mq
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                    }
                    break;
                default:
                    break;
            }
        }
    }
}//end while

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();

//mqSet是清理了不必要的mq的mqSet
for (MessageQueue mq : mqSet) {
        //说明该mq是新增的 旧的不需要分配 已经分配过了
    if (!this.processQueueTable.containsKey(mq)) {
        //如果消费者是顺序消费
        //尝试加锁该MessageQueue
        //加锁失败就跳过该mq 不会拉去该mq的消息
        //顺序消费并且加锁成功 或者 是并发消费 才会继续往下执行
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, add a new mq failed, because lock failed");
            continue;
        }

        //消费客户端移除该mq的消费offset
        //通过LocalFileOffsetStore或RemoteBrokerOffsetStore移除该消息队列的消费offset
        this.removeDirtyOffset(mq);

        ProcessQueue pq = new ProcessQueue();
        //向broker发送命令QUERY_CONSUMER_OFFSET获取broker端记录的该mq的消费offset
        //默认是CONSUME_FROM_LAST_OFFSET
        //使用offsetStore读取消费进度 
        //广播是本地消费进度 集群是远程消费进度
        long nextOffset = this.computePullFromWhere(mq);

        if (nextOffset >= 0) {
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            if (pre != null) {
                //说明当前消息队列之前已经被分配给当前消费者
                //已经存在的ProcessQueue拉取offset,在每次拉取到消息以后会重新设置
                log.info("doRebalance, {}, mq already exists");
            } else {


                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                //构建拉取PullRequest
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                //设置拉取消息的偏移量
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                //将新加入的放入
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}
//遍历pullRequestList集合
//把pullRequest对象添加到PullMessageService服务线程的阻塞队列内供PullMessageService拉取执行
this.dispatchPullRequest(pullRequestList);
return changed;

} ```

updateProcessQueueTableInRebalance是消费端重新负载的核心方法。

顾名思义功能就是更新处理器队列集合RebalanceImpl.processQueueTable。

那么什么是processQueue,它的作用是什么呢?

它是一个队列消费快照,消息拉取的时候,会将实际消息体、拉取相关的操作存放在其中。比如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供。

另外在拉取消息的时候使用的是PullRequest去请求,PullRequest结构如下:

java public class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean previouslyLocked = false; }

可以发现1个ProcessQueue和1个MessageQueue是一一对应的,MessageQueue用来表示拉取回来的消息元数据信息。

具体可参考:https://blog.csdn.net/Saintmm/article/details/120723628

假设有6个队列,有2个消费者分别是消费者A和消费者B。

————————————————消费者B第一轮负载均衡——————————————————

消费者B第一次做负载均衡分到的队列是123

1.先从processQueueTable获取Entry 集合it

2.由于是第一次做负载均衡,所以it为空,此时跳过while循环

3.遍历本次分到的mqSet,判断processQueueTable中是否存在对应的mq,如果不存在说明是新分配的mq。

4.如果是新分配给当前消费者的mq&&当前消费者是顺序消费&&加锁失败啥也不做。

为什么要加锁?

因为顺序消费需要加分布式锁+本地锁,此处是在broker加分布式锁。

为什么加锁失败啥也不做?

因为加锁失败,说明是其它的消费者在broker加的分布式锁还没释放,那么等下次Rebalance的时候再尝试加锁即可。

5如果是新分配给当前消费者的mq&&当前消费者不是顺序消费&&加锁成功。

5.1调用removeDirtyOffset清除本地的脏消费偏移量

5.2重新计算队列拉取消息的偏移量。

5.3构建拉取消息的PullRequest

6.分发PullRequest拉取消息

————————————————消费者B第二轮负载均衡——————————————————

消费者B第二次做负载均衡分到的队列mqSet是34

1.先从processQueueTable获取Entry 集合it,集合it中存放的是123

2.由于是第二次做负载均衡,所以it不为空,此时进入while循环

3.遍历it集合,集合it中存放的是123

4.先判断分配的mqSet是否包含1、2、3,mqSet是3、4,所以队列1、2会被丢弃

5.队列3显然是包含在mqSet中的,对于队列3会判断拉取是否过期,如果拉取过期那么要丢弃队列3,在removeUnnecessaryMessageQueue方法中如果消费者B是顺序消费还需要释放broker端的分布式锁。

6.遍历本次分到的mqSet,判断processQueueTable中是否存在对应的mq,如果不存在说明是新分配的mq。

7.如果是新分配给当前消费者的mq&&当前消费者是顺序消费&&加锁失败啥也不做。

为什么要加锁?

因为顺序消费需要加分布式锁+本地锁,此处是在broker加分布式锁。

为什么加锁失败啥也不做?

因为加锁失败,说明是其它的消费者在broker加的分布式锁还没释放,那么等下次Rebalance的时候再尝试加锁即可。

8.如果是新分配给当前消费者的mq&&当前消费者不是顺序消费&&加锁成功。

8.1调用removeDirtyOffset清除本地的脏消费偏移量

8.2重新计算队列拉取消息的偏移量。

8.3构建拉取消息的PullRequest

9.分发PullRequest拉取消息

RebalancePushImpl#removeUnnecessaryMessageQueue

``` @Override public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { //持久化当前消费进度到broker this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); //移除本地消费进度 this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); //集群模式且顺序消费 if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals (this.defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { try { return this.unlockDelay(mq, pq); } finally { pq.getLockConsume().unlock(); } } else { log.warn("mq is consuming, so can not unlock it, {}. maybe hanged for a while"); //增加tryUnlockTimes pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception", e); } return false; }

//否则返回true
return true;

} ```

his.unlockDelay(mq, pq);中调用RebalancePushImpl.this.unlock(mq, true);解除分布式锁

RebalancePushImpl#computePullFromWhere

``` @Override public long computePullFromWhere(MessageQueue mq) { long result = -1; //默认是从上一个OFFSET消费 默认策略,从该队列最尾开始消费,即跳过历史消息 //private ConsumeFromWhere consumeFromWhere = // ConsumeFromWhere.CONSUMEFROMLASTOFFSET; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUMEFROMLASTOFFSETANDFROMMINWHENBOOTFIRST: case CONSUMEFROMMINOFFSET: case CONSUMEFROMMAXOFFSET: //默认是从上次消费的地方拉取 case CONSUMEFROMLASTOFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { //返回本地消费的偏移量 result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { result = 0L; } else { try { //从broker服务器拉取 result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUMEFROMFIRSTOFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUMEFROMTIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; }

default:
            break;
    }

    return result;
}

``` 默认是CONSUMEFROMLAST_OFFSET,从上次消费的地方拉取。

先从offsetStore获取拉取消息的偏移量,如果从offsetStore获取不到。

那么从broker端获取队列拉取消息的偏移量。

RebalancePushImpl#dispatchPullRequest

java @Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { //遍历所有的pullRequest for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl .executePullRequestImmediately(pullRequest); } }

分发PullRequest,此时会立刻执行1个拉取消息的动作,至于消息是怎么拉取的,下篇文章见。

我们继续看DefaultMQPullConsumerImpl负载均衡。

DefaultMQPullConsumerImpl负载均衡

MQClientInstance#start

``` public void start() throws MQClientException {

synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // 启动负载均衡服务 这里是一个Thread 所以看run方法
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object has been created before, and failed.", null);
            default:
                break;
        }
    }
}

```

ServiceThread#start

public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; //this 指的是rebalanceService this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); }

RebalanceService#run

``` @Override public void run() { log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        //做负载均衡操作
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

```

MQClientInstance#doRebalance

每个消费者都需要做负载均衡

 public void doRebalance() {     //MQClientInstance遍历已注册的消费者,对消费者执行doRebalance()方法        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {            MQConsumerInner impl = entry.getValue();            if (impl != null) {                try {                    //MQConsumerInner impl有2个实现                                        //DefaultMQPullConsumerImpl                    //pull this.rebalanceImpl.doRebalance(false);                                        //DefaultMQPushConsumerImpl                      //push this.rebalanceImpl.doRebalance(this.isConsumeOrderly());                    impl.doRebalance();               } catch (Throwable e) {                    log.error("doRebalance exception", e);               }           }       }   }

DefaultMQPullConsumerImpl#doRebalance

@Override    public void doRebalance() {        if (this.rebalanceImpl != null) {            //注意这里传的一直是false            this.rebalanceImpl.doRebalance(false);       }   }

DefaultMQPushConsumerImpl做负载均衡是否是顺序消费是根据消费者关联的监听器确定。

DefaultMQPullConsumerImpl做负载均衡是否是顺序消费传的是false。

DefaultMQPushConsumerImpl剩下的流程和DefaultMQPushConsumerImpl一模一样文章来源地址https://www.toymoban.com/news/detail-529849.html

到了这里,关于28.RocketMQ之消费者的负载均衡源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 消费者Rebalance算法 解析——图解、源码级解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2022年10月15日 🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是

    2024年01月19日
    浏览(43)
  • RocketMQ (九) 消费者分组-ConsumerGroup

    消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。 和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。 在消费者分组中,统一定

    2024年02月16日
    浏览(39)
  • RocketMQ教程-(5)-功能特性-消费者分类

    Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者,本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。 Apache RocketMQ 面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和控制方式都

    2024年02月16日
    浏览(46)
  • python rocketmq生产者消费者

    安装依赖包 生产者 需要注意的是假如你用的java SDK 需要只是UNinname 我们可以看到下列代码设置了tag以及key,在页面可以根据key查找消息 消费方式PullConsumer(全部消费)(可重复消费) 消费方式PushConsumer(即时消费)(不可重复消费) 生产者发送消息选择队列,以及设置顺

    2024年02月14日
    浏览(40)
  • RocketMQ 的消费者类型详解与最佳实践

    作者:凌楚 在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,分别是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。 本篇文章也会根据不同的消费者类型来进行讲述

    2024年02月02日
    浏览(38)
  • RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

    修改rocketmq文件夹broker.conf 在RocketMQ独享实例中支持IPv4和IPv6双栈,主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的。RocketMQ的Broker端、Namesrv端和客户端都需要支持IPv4和IPv6协议,以便能够同时监听IPv4和IPv6地址,并使用相应的协议栈进行通信。在Broker端,我们需要在

    2024年02月13日
    浏览(39)
  • RocketMQ教程-(4)-领域模型-消费者分组ConsumerGroup

    消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。 和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。 在消费者分组中,统一定

    2024年02月16日
    浏览(40)
  • springboot整合rocketmq:一个消费者组怎么订阅多个topic

            一个消费者组中的所有消费者订阅关系,可以多个topic,多个tag,但是必须一致,否则就倒沫子了,如下图:  下面贴了如下结构的代码  一个消费组(消费者)订阅多个topic的代码(只写了一个消费组的,其他类似): 结果:

    2024年02月15日
    浏览(54)
  • RocketMQ生产者和消费者都开启Message Trace后,Consume Message Trace没有消费轨迹

    1、生产者和消费者所属同一个程序 2、生产者开启消息轨迹 3、消费者开启消息轨迹 4、生产者和消费者一起开启后,在RocketMQ可视化界面,无法查看到消息的消费轨迹 注:如果只开启生产者或消费者其中之一的消息轨迹,则消息的消费轨迹是正常的 无法展示消费轨迹 具体原

    2024年02月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包