29.RocketMQ之消费者负载均衡策略

这篇具有很好参考价值的文章主要介绍了29.RocketMQ之消费者负载均衡策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


highlight: arduino-light

负载均衡策略概述

消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。

上篇文章讲解了负载均衡过程,这篇文章讲解消费者负载均衡的策略。

AllocateMessageQueueStrategy是负载均衡策略接口,有两个方法,分别是allocate和getName,allocate方法是为消费者分配消息队列,getName是获取负载均衡策略的名字。AllocateMessageQueueStrategy接口有六个子类实现。

负载均衡策略说明

它们具体说明如下:

  • AllocateMessageQueueAveragely:平均负载策略,将消息队列平均分配给每一个消息者。假设一个topic有8个消息队列,有3个消息者A、B、C,那么采用该负载策略,那么A首先分配3个消息队列,然后B也分配3个消息队列,最后C分配2个消息队列。

  • AllocateMessageQueueAveragelyByCircle:循环平均负载策略,跟平均负载策略不同的是。是将消息队列一个一个的分配给消息者。假设一个topic有8个消息队列,有3个消息者A、B、C。采用循环平均负载策略分配的方法是首先给A、B、C分别分配一个,然后再进行第二轮分配,也是给A、B、C分别再分配一个,这样子还剩下两个就分别分配给A、B。

  • AllocateMessageQueueByConfig:配置负载策略,用户直接给消费者分配配置消息队列。

  • AllocateMessageQueueConsistentHash:一致性哈希负载策略,为每一个消费者创建多个虚拟的节点,将虚拟节点连成一个环,这个环就是一致性哈希环,然后将消息队列进行哈希计算得到哈希值,通过哈希值找到距离一致性哈希环顺时针方向最近的那个虚拟节点,此时就可以通过虚拟节点获取到真实的消费者了,就将这个消息队列分配给这个消息者。

  • AllocateMessageQueueByMachineRoom:同机房分配策略,将Broker的消息队列分配给同机房的消费者。

  • AllocateMachineRoomNearby:AllocateMessageQueueByMachineRoom策略的升级版本,不仅将Broker的消息队列分配给同机房的消费者,还会将剩下的消息队列根据给定的分配策略进行分配给消费者。

接下来,将从源码的层面分析上述的负载均衡策略。

AllocateMessageQueueAveragely

```java //代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely#allocate public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) {

//省略校验合法性的代码

    //获取当前客户端id在所有消费者里面的位置
    int index = cidAll.indexOf(currentCID);
    //看是否能够均分
    int mod = mqAll.size() % cidAll.size();
    //如果消息队列的数量小于等于消费者的数量。一个消费者最多只能分到一个消息消息队列
    //否则,如果不能均分且index小于mode,均分以后再加上一个
    int averageSize =
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
            + 1 : mqAll.size() / cidAll.size());
    //开始的位置
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    //被分配的消息队列的范围上限
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;

} ```

AllocateMessageQueueAveragely的allocate方法的逻辑:

  • 获取当前消费者在所有消费者里面的位置(索引)
  • 计算当前消费者能分配的消息队列的数量,以及分配的消息队列的范围,即从哪里开始分配到哪里结束分配。
  • 根据分配消息队列的范围给消费者进行分配。

AllocateMessageQueueAveragelyByCircle

```java //代码位置: org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle#allocate public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) {

//省略校验合法性的代码

    //当前consumer排序后的索引
    int index = cidAll.indexOf(currentCID);
    for (int i = index; i < mqAll.size(); i++) {
        //取模
        if (i % cidAll.size() == index) {
            result.add(mqAll.get(i));
        }
    }
    return result;

} ```

AllocateMessageQueueAveragelyByCircle的allocate方法的逻辑:

  • 获取当前消费者在所有消费者里面的位置index(索引)
  • 遍历所有的消息队列,当前消息队列的索引对消费者数量进行取模的值等于index,将此此时的消息队列分配给消费者。

AllocateMessageQueueByConfig

```java /代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByConfig#allocate private List messageQueueList;

public void setMessageQueueList(List messageQueueList) { this.messageQueueList = messageQueueList; }

@Override

public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { return this.messageQueueList; } ```

AllocateMessageQueueByConfig类有一个messageQueueList属性,这个就是用户配置的消息队列。AllocateMessageQueueByConfig的allocate方法就是将用户配置的这个消息队列返回给消费者。

☆AllocateMessageQueueConsistentHash

```java //代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueConsistentHash#allocate public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) {

//省略校验合法性的代码

    //构建消费者节点
    Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
    for (String cid : cidAll) {
        cidNodes.add(new ClientNode(cid));
    }

    //用消费者节点生产多个虚拟的节点,构建哈希环
    final ConsistentHashRouter<ClientNode> router; //for building hash ring
    if (customHashFunction != null) {
        router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
    } else {
        router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
    }


    List<MessageQueue> results = new ArrayList<MessageQueue>();
    //遍历消息队列
    for (MessageQueue mq : mqAll) {
        //对messageQueue进行hash计算,找到顺时针最近的consumer节点
        ClientNode clientNode = router.routeNode(mq.toString());
        //判断是否是当前consumer
        if (clientNode != null && currentCID.equals(clientNode.getKey())) {
            results.add(mq);
        }
    }

    return results;

} ```

AllocateMessageQueueConsistentHash的allocate方法逻辑:

  • 构建消费者节点
  • 用消费者节点生成多个虚拟的节点,构建哈希环
  • 遍历消息队列,对消息队列进行hash计算,找到距离最近的消费者节点,将此时的消息队列分配给消费者。

AllocateMessageQueueConsistentHash的allocate方法实现了一致性哈希算法,如果后续有使用一致性哈希算法的场景可拿来参考。

AllocateMessageQueueConsistentHash的allocate方法实现了一致性哈希算法,如果后续有使用一致性哈希算法的场景可拿来参考。

AllocateMessageQueueConsistentHash的allocate方法实现了一致性哈希算法,如果后续有使用一致性哈希算法的场景可拿来参考。

AllocateMessageQueueByMachineRoom

```java //代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByMachineRoom#allocate public List allocate(String consumerGroup, String currentCID, List mqAll, //省略校验合法性的代码 List premqAll = new ArrayList (); for (MessageQueue mq : mqAll) { //brokerName命名规则 machine_room1@broker-a String[] temp = mq.getBrokerName().split("@"); //判断是否符合指定的机房条件 if (temp.length == 2 && consumeridcs.contains(temp[0])) { premqAll.add(mq); } }

//平均分配到的队列数
    int mod = premqAll.size() / cidAll.size();
    //取模
    int rem = premqAll.size() % cidAll.size();
    //当前分配到的第一个队列索引
    int startIndex = mod * currentIndex;
    //当前分配到的最后一个队列索引
    int endIndex = startIndex + mod;
    //取startIndex到endIndex的队列
    for (int i = startIndex; i < endIndex; i++) {
        result.add(mqAll.get(i));
    }
    //如果不能平均分配,并且模大于当前的索引,再分配一个
    if (rem > currentIndex) {
        result.add(premqAll.get(currentIndex + mod * cidAll.size()));
    }
    return result;

} ```

AllocateMessageQueueByMachineRoom的allocate方法逻辑:

  • 遍历所有的消息队列,将所有的指定机房的消息队列收集起来。
  • 将上述获取得到的消息队列平均分配给消费者,如果不能分配,再次分配一个消息队列。

AllocateMachineRoomNearby

```java //代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMachineRoomNearby#allocate public List allocate(String consumerGroup, String currentCID, List mqAll, //省略校验合法性的代码

//group mq by machine room
    //根据机房对队列分组
    Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    for (MessageQueue mq : mqAll) {
        //判断当前broker处于哪个机房
        String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
        //机房不为空,将broker放到分组中
        if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
            if (mr2Mq.get(brokerMachineRoom) == null) {
                mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
            }
            mr2Mq.get(brokerMachineRoom).add(mq);
        } else {
            throw new IllegalArgumentException("Machine room is null for mq " + mq);
        }
    }

    //group consumer by machine room
    //consumer按机房分组
    Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
    for (String cid : cidAll) {
        //判断consumer处于哪个机房
        String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
        if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
            if (mr2c.get(consumerMachineRoom) == null) {
                mr2c.put(consumerMachineRoom, new ArrayList<String>());
            }
            mr2c.get(consumerMachineRoom).add(cid);
        } else {
            throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
        }
    }

    List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

    //1.allocate the mq that deploy in the same machine room with the current consumer
    //给当前consumer分当前机房的那些MessageQeueue
    String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
    //得到当前机房的MessageQueue
    List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    //得到当前机房的Consumer
    List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
        //得到当前机房所有MessageQueue和Consumers后根据指定的策略再负载
        allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    }

    //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
    //如果该MessageQueue的机房 没有同机房的consumer,将这些MessageQueue按配置好的备用策略分配给所有的consumer
    for (String machineRoom : mr2Mq.keySet()) {
        if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
            //添加分配到的游离态MessageQueue
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
        }
    }

    return allocateResults;
}

```

AllocateMachineRoomNearby的allocate方法逻辑:

  • 将消息队列根据机房进行分组
  • 将消费者根据机房进行分组
  • 获取当前机房的所有消息队列,将当前机房的所有消息队列再根据给定的分配策略进行分配
  • 剩余不是该机房的消息队列根据给定的分配策略进行分配给其他的消费者。

以上内容来自:https://zhuanlan.zhihu.com/p/443869119

AllocateMessageQueueAveragely:平均分配

md AllocateMessageQueueAveragely:平均分配 举例:8个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3个:c1,c2,c3 分配如下: c1:q1,q2,q3 c2:q4,q5,a6 c3:q7,q8

AllocateMessageQueueAveragely:平均轮询分配

md AllocateMessageQueueAveragelyByCircle:平均轮询分配 举例:8个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3个:c1,c2,c3 分配如下: c1:q1,q4,q7 c2:q2,q5,a8 c3:q3,q6

注意:消息队列的分配遵循一个消费者可以分配到多个队列,但同一个消息队列只会分配给一个消费者,故如果出现消费者个数大于消息队列数量,则有些消费者无法消费消息,且这些消费者会被浪费!文章来源地址https://www.toymoban.com/news/detail-524972.html

到了这里,关于29.RocketMQ之消费者负载均衡策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python rocketmq生产者消费者

    安装依赖包 生产者 需要注意的是假如你用的java SDK 需要只是UNinname 我们可以看到下列代码设置了tag以及key,在页面可以根据key查找消息 消费方式PullConsumer(全部消费)(可重复消费) 消费方式PushConsumer(即时消费)(不可重复消费) 生产者发送消息选择队列,以及设置顺

    2024年02月14日
    浏览(40)
  • RocketMQ 的消费者类型详解与最佳实践

    作者:凌楚 在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,分别是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。 本篇文章也会根据不同的消费者类型来进行讲述

    2024年02月02日
    浏览(38)
  • RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

    修改rocketmq文件夹broker.conf 在RocketMQ独享实例中支持IPv4和IPv6双栈,主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的。RocketMQ的Broker端、Namesrv端和客户端都需要支持IPv4和IPv6协议,以便能够同时监听IPv4和IPv6地址,并使用相应的协议栈进行通信。在Broker端,我们需要在

    2024年02月13日
    浏览(39)
  • RocketMQ教程-(4)-领域模型-消费者分组ConsumerGroup

    消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。 和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。 在消费者分组中,统一定

    2024年02月16日
    浏览(40)
  • [RocketMQ] Consumer消费者启动主要流程源码 (六)

    客户端常用的消费者类是DefaultMQPushConsumer, DefaultMQPushConsumer的构造器以及start方法的源码。 1.创建DefaultMQPushConsumer实例 最终都是调用下面四个参数的构造函数: 指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器, 创建了一个DefaultMQPushConsumerImpl实例

    2024年02月16日
    浏览(48)
  • RocketMQ 消费者Rebalance 解析——图解、源码级解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年4月15日 🍊 新专栏筹备中,还是熟悉的源码,还是熟悉的感觉! 🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力

    2023年04月18日
    浏览(53)
  • RocketMQ 消费者Rebalance算法 解析——图解、源码级解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2022年10月15日 🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是

    2024年01月19日
    浏览(43)
  • springboot整合rocketmq:一个消费者组怎么订阅多个topic

            一个消费者组中的所有消费者订阅关系,可以多个topic,多个tag,但是必须一致,否则就倒沫子了,如下图:  下面贴了如下结构的代码  一个消费组(消费者)订阅多个topic的代码(只写了一个消费组的,其他类似): 结果:

    2024年02月15日
    浏览(54)
  • RocketMQ生产者和消费者都开启Message Trace后,Consume Message Trace没有消费轨迹

    1、生产者和消费者所属同一个程序 2、生产者开启消息轨迹 3、消费者开启消息轨迹 4、生产者和消费者一起开启后,在RocketMQ可视化界面,无法查看到消息的消费轨迹 注:如果只开启生产者或消费者其中之一的消息轨迹,则消息的消费轨迹是正常的 无法展示消费轨迹 具体原

    2024年02月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包