根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)

这篇具有很好参考价值的文章主要介绍了根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、虚拟主机 + Consume设计

1.1、承接问题

1.2、具体实现

1.2.1、消费者订阅消息实现思路

1.2.2、消费者描述自己执行任务方式实现思路

1.2.3、消息推送给消费者实现思路

1.2.4、消息确认


一、虚拟主机 + Consume设计


1.1、承接问题

前面已经实现了虚拟主机大部分功能以及转发规则的判定,也就是说,现在消息已经可以通过 转换机 根据对应的转发规则发送给对应的 队列 了.

那么接下来要解决的问题就是,消费者该如何订阅消息(队列),如何把消息推送给消费者,以及消费者如何描述自己怎么执行任务~

1.2、具体实现

1.2.1、消费者订阅消息实现思路

消费者是以队列为维度订阅消息的,并且一个队列可以被多个消费者订阅,那么一旦队列中有消息,这个消息到底因该给谁呢?此处就约定,消费者之间按照 “轮询” 的方式来进行消费.

这里我们就需要定义一个类(ConsumerEnv),用来描述一个消费者,如下

public class ConsumerEnv {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
    //通过这个回调来处理收到的消息
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }

    public String getConsumerTag() {
        return consumerTag;
    }

    public void setConsumerTag(String consumerTag) {
        this.consumerTag = consumerTag;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public boolean isAutoAck() {
        return autoAck;
    }

    public void setAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }
}

 文章来源地址https://www.toymoban.com/news/detail-665903.html

再给每个队列对象(MSGQueue 对象)添加一个属性 List,用来包含若干个上述消费者(有哪些消费者订阅了当前队列),如下图:

根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7),RabbitMQ,rabbitmq,分布式

    //当前队列都有哪些消费者订阅了
    private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
    //记录当取到了第几个消费者(AtomicInteger 是线程安全的)
    private AtomicInteger consumerSeq = new AtomicInteger(0);

    /**
     * 添加一个新的订阅者
     * @param consumerEnv
     */
    public void addConsumerEnv(ConsumerEnv consumerEnv) {
        consumerEnvList.add(consumerEnv);
    }

    /**
     * 删除订阅者暂时先不考虑
     */

    /**
     * 挑选一个订阅者,来处理当前的消息(按照轮询的方式)
     * @return
     */
    public ConsumerEnv chooseConsumer() {
        if(consumerEnvList.size() == 0) {
            //该队列暂时没有人订阅
            return null;
        }
        //计算当前要取的下标
        int index = consumerSeq.get() % consumerEnvList.size();
        consumerSeq.getAndIncrement();// 自增
        return consumerEnvList.get(index);
    }

VirtualHost 中订阅消息实现

    /**
     * 订阅消息
     * 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
     * @param consumerTag 消费者的身份标识
     * @param queueName
     * @param autoAck 消息被消费之后,应答的方式,true 标识自动应答,false 标识手动应答
     * @param consumer 是一个回调函数,此处设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子了
     * @return
     */
    public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        //构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把 Consumer 对象添加到队列中
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
            System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);
            return false;
        }
    }

1.2.2、消费者描述自己执行任务方式实现思路

当执行订阅消息的时候,我们就让消费者自己去实现处理消息的操作(消息的内容通过参数传递,具体要干啥,取决于消费者自己的业务路基),最后再让线程池来执行回调函数.

这里我们使用函数式接口(回调函数)的方式(lambda 表达式),让消费者在订阅消息的时候,就可以实现未来收到消息后如何去处理消息的操作.

@FunctionalInterface
public interface Consumer {

    /**
     * Delivery 的意思是 ”投递“,这个方法预期是在服务器收到消息之后来调用
     * 通过这个方法,把消息推送给对应的消费者
     * (注意,这里的方法名和参数,也都是参考 RabbitMQ 来展开的)
     * @param consumerTag
     * @param basicProperties
     * @param body
     */
    void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body);


}

为什么要这样实现?

一方面,这种思路也是参考 RabbitMQ。

另一方面,这是由于Java 的函数是不能脱离类存在的,为了实现这种 lambda,java 曲线救国,引入 函数式接口.

对于函数式接口来说:

  1. 首先是 interface 类型
  2. 只能有一个方法
  3. 添加 @FunctionalInterface 注解.

实际上,这也是 lambda 的底层实现(本质)

1.2.3、消息推送给消费者实现思路

这里我们可以添加一个扫描线程,让他来去队列中拿任务.

为什么用了扫描线程还需要用线程池?

如果就一个扫描线程,既要获取消息,又要执行回调,这一个线程可能会忙不过来,因为消费者给出的回调,具体干什么的,咱们是不知道的.

扫描线程怎么知道哪个队列来了新的消息?

  1. 一个简单粗暴的办法,就是直接让扫描线程不停的循环遍历所有队列,发现有元素就立即处理。
  2. 另一个更优雅的办法(我采取的办法),就是用一个阻塞队列,队列中的元素就是接收消息的队列的名字,扫描线程只需要盯住这一个阻塞对垒即可,此时阻塞队列中传递的队列名,就相当于 “令牌”

每次拿到令牌,才能调动一次军队,也就是从对应的队列中取一个消息.

具体的,实现一个 ConsumerManager 类,用来管理消费者的上述行为.

public class ConsumerManager {
    // 持有上层的 VirtualHost 对象的引用,用来操作数据
    private VirtualHost parent;
    // 指定一个线程池,负责取执行具体的回调任务
    private ExecutorService workerPool = Executors.newFixedThreadPool(4);
    //存放令牌的队列
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
    //扫描线程
    private Thread scannerThread = null;


    /**
     * 初始化
     * @param parent
     */
    public ConsumerManager(VirtualHost parent) {
        this.parent = parent;

        //创建扫描线程,取队列中消费消息
        scannerThread = new Thread(() -> {
            while(true) {
                try {
                    //1.拿到令牌
                    String queueName = tokenQueue.take();
                    //2.根据令牌,找到队列
                    MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if(queue == null) {
                        throw new MqException("[ConsumerManager] 取到令牌后发现,该队列名不存在!queueName=" + queueName);
                    }
                    //3.从这个队列中消费一个消息
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        //设置为后台线程
        scannerThread.setDaemon(true);
        scannerThread.start();
    }

    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

    /**
     * 添加消费者
     * 找到对应队列的 List 列表, 把消费者添加进去,最后判断,如果有消息,就立刻消费
     * @param consumerTag 消费者身份标识
     * @param queueName
     * @param autoAck 消息被消费之后,应答的方式,true 标识自动应答,false 标识手动应答
     * @param consumer 是一个回调函数,此处设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子了
     * @throws MqException
     */
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        //找到对应的队列
        MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
        if(queue == null) {
            throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
        }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);
            //如果当前队列中已经有一些消息了,需要立即消费掉
            int n = parent.getMemoryDataCenter().getMessageCount(queueName);
            for(int i = 0; i < n; i++) {
                //这个方法调用一次就消费一条消息
                consumeMessage(queue);
            }
        }
    }

    /**
     * 扫描线程:找到对应的队列后,消费者从队列中拿出消息并消费
     * @param queue
     */
    private void consumeMessage(MSGQueue queue) {
        //1.按照轮询的方式,找个消费者出来
        ConsumerEnv luckDog = queue.chooseConsumer();
        if(luckDog == null) {
            //当前队列中没有消费者,暂时不用消费,等后面有消费者了再说
            return;
        }
        //2.从队列中取出一个消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if(message == null) {
            //当前队列中还没有消息,也不需要消费
            return;
        }
        //3.把消息带入到消费者的回调方法中,丢给线程池执行
        workerPool.submit(() -> {
            try {
                //1.把消息放到待确认的集合当中,这个操作一定要在执行回调之前(防止执行回调过程中出现异常,导致消息丢失)
                parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(), message);
                //2.真正执行回调操作
                luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),
                        message.getBody());
                //3.如果当前是 ”自动应答“ ,就可以直接把消息删除了
                //  如果当前是 ”手动应答“ ,则先不处理,交给后续消费者调用 basicAck 方法来处理
                if(luckDog.isAutoAck()) {
                    //1) 删除硬盘上的消息
                    if(message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    //2) 删除上面的待确认集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    //3) 删除内存上的消息中心的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

1.2.4、消息确认

消息确认,就是保证消息被正确消费~~ 

正确消费就是指消费者的回调方法顺利执行完了(没有抛异常之类的),这条消息的使命就完成了,此时就可以删除了。

为了达成消息不丢失这样的效果,具体步骤如下:

  1. 在真正执行回调之前,把消息放到 “待确认的集合” 中,避免应为回调失败,导致消息丢失.根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7),RabbitMQ,rabbitmq,分布式
  2. 执行回调
  3. 当去消费者采取的是 autoAck=true ,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了
    1. 硬盘
    2. 内存中的消息中心
    3. 待确认的消息集合
  4. 当前消费者若采取的是 autoAck=false,手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API 表示应答.

 basicAck 完成主动应答

    /**
     * 确认消息
     * 各个维度删除消息即可
     * @param queueName
     * @param messageId
     * @return
     */
    public boolean basicAck(String queueName, String messageId) {
        queueName = virtualHostName + queueName;
        try {
            //1.获取消息和队列
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if(queue == null) {
                throw new MqException("[VirtualHost] 要确认的队列不存在!queueName=" + queueName);
            }
            Message message = memoryDataCenter.getMessage(messageId);
            if(message == null) {
                throw new MqException("[VirtualHost] 要确认的消息不存在!messageId=" + messageId);
            }
            //2.各个维度删除消息
            if(message.getDeliverMode() == 2) {
                diskDataCenter.deleteMessage(queue, message);
            }
            memoryDataCenter.removeMessage(messageId);
            memoryDataCenter.removeMessageWaitAck(queueName, messageId);
            System.out.println("[VirtualHost] basicAck 成功,消息确认成功!queueName=" + queueName +
                    ", messageId=" + messageId);
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] basicAck 失败,消息确认失败!queueName=" + queueName +
                    ", messageId=" + messageId);
            e.printStackTrace();
            return false;
        }
    }

扫描线程完成自动应答

    /**
     * 扫描线程:找到对应的队列后,消费者从队列中拿出消息并消费
     * @param queue
     */
    private void consumeMessage(MSGQueue queue) {
        //1.按照轮询的方式,找个消费者出来
        ConsumerEnv luckDog = queue.chooseConsumer();
        if(luckDog == null) {
            //当前队列中没有消费者,暂时不用消费,等后面有消费者了再说
            return;
        }
        //2.从队列中取出一个消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if(message == null) {
            //当前队列中还没有消息,也不需要消费
            return;
        }
        //3.把消息带入到消费者的回调方法中,丢给线程池执行
        workerPool.submit(() -> {
            try {
                //1.把消息放到待确认的集合当中,这个操作一定要在执行回调之前(防止执行回调过程中出现异常,导致消息丢失)
                parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(), message);
                //2.真正执行回调操作
                luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),
                        message.getBody());
                //3.如果当前是 ”自动应答“ ,就可以直接把消息删除了
                //  如果当前是 ”手动应答“ ,则先不处理,交给后续消费者调用 basicAck 方法来处理
                if(luckDog.isAutoAck()) {
                    //1) 删除硬盘上的消息
                    if(message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    //2) 删除上面的待确认集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    //3) 删除内存上的消息中心的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

如果在回调方法中抛异常了?

回调方法中抛异常了,后续逻辑执行不到,这个消息就会始终呆在待确认的集合中, RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 中不叫线程,人家是叫进程,但是注意,这个进程不是操作系统中的进程,而是 erlang 中的概念),负责关注这个 待确认集合中,每个消息待了多久了,如果超出了一定的时间范围,就会把这个消息放到一个特定的队列 —— “死信队列”(这里就不展示了,需要的可以私聊我)

如果在执行回调过程中,broker server 崩了,内存数据全没了?

此时硬盘的数据还在,broker server 重启之后,这个消息就又被加载回内存了,就像从来没有被消费过一样,消费者就又机会重新拿到这个消息,重新消费(重复消费的问题,是由消费者的业务代码负责保证的,broker server 管不了).

根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7),RabbitMQ,rabbitmq,分布式

 

到了这里,关于根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring RabbitMQ 配置多个虚拟主机(vhost)

    在日常开发中,同时需要用到RabbitMQ多个虚拟机(vhost)。应用场景:需要接收多个交换机的数据,而交换机都在不同的虚拟机(vhost) Docker安装RabbitMQ 【SpringCloud】整合RabbitMQ六大模式应用(入门到精通) 本地模拟发送、接收RabbitMQ数据

    2024年02月21日
    浏览(55)
  • 3.RabbitMQ UI管理界面使用(用户、虚拟主机)

    RabbitMQ专栏目录(点击进入…) 1.服务方式 (1)打开服务 (2)在RabbitMQ Command Prompt中执行命令(方式一) 管理插件包含在RabbitMQ发行版中。与其他任何插件一样,必须先启用它才能使用它 (3)cmd执行命令(方式二) 因为在上面配置了RabbitMQ的sbin环境变量,所以在cmd中执行

    2024年02月05日
    浏览(35)
  • RabbitMQ虚拟主机无法启动的原因和解决方案

    摘要: RabbitMQ是一个广泛使用的开源消息代理系统,但在使用过程中可能会遇到虚拟主机无法启动的问题。本文将探讨可能导致该问题的原因,并提供相应的解决方案,以帮助读者解决RabbitMQ虚拟主机启动失败的困扰。 在RabbitMQ中,虚拟主机(Virtual Host)是一种逻辑分区,用

    2024年02月12日
    浏览(39)
  • Unity制作虚拟主机装机模拟器(课程设计)

    1.设计阶段 1.1需求分析 本虚拟装机系统是为了帮助用户学习计算机的组装过程,提供动手组装、教学模式和零件介绍三种模式。在零件介绍中,用户可以通过语音和文字介绍了解不同电脑零件的功能和名称。在教学模式中,用户可以观看动画短片,了解计算机的发展和组装计

    2024年01月21日
    浏览(42)
  • 虚拟机中docker安装rabbitmq 宿主机不能访问rabbitmq管理界面问题解决

    1, MacOS的操作系统, 通过Vmware Fusion虚拟机软件, 安装了Centos7操作系统, 里面安装了docker engine(1.13.1); 2, 试验rabbitmq, 下载了rabbit:latest镜像 3, 通过如下命令启动rabbitmq 4, 启动之后, 在宿主机浏览器测试访问虚拟机中的rabbitmq http://192.168.88.100:15672/ 试验结果: 拒绝访问 1, 检查centos-l

    2024年02月10日
    浏览(51)
  • Flutter环境搭建【win10虚拟机】+夜神模拟器【主机】

    为了安装和运行 Flutter,你的开发环境必须至少满足以下要求: 操作系统:Windows 10 或更高的版本(基于 x86-64 的 64 位操作系统)。 磁盘空间:除安装 IDE 和一些工具之外还应有至少 2.5 GB 的空间。 工具:要让 Flutter 在你的开发环境中正常使用,依赖于以下的工具: Windows P

    2024年04月23日
    浏览(59)
  • 根据源码,模拟实现 RabbitMQ - 转发规则实现(6)

    目录 一、转发规则实现 1.1、需求分析 1.2、实现 Router 转发规则 1.2.1、bindingKey 和 routingKey 参数校验 1.2.2、消息匹配规则 1.2.3、主题交换机匹配规则 这里主要实现 routingKey 和 bindingKey 参数的校验,以及 TopicExchange 类型绑定规则的实现. 这里重点来看一下 Topic 交换机的转发规则

    2024年02月12日
    浏览(40)
  • 根据源码,模拟实现 RabbitMQ - 内存数据管理(4)

    目录 一、内存数据管理 1.1、需求分析 1.2、实现 MemoryDataCenter 类 1.2.1、ConcurrentHashMap 数据管理 1.2.2、封装交换机操作 1.2.3、封装队列操作 1.2.4、封装绑定操作 1.2.5、封装消息操作 1.2.6、封装未确认消息操作 1.2.7、封装恢复数据操作 当前已经使用 数据库 管理了 交换机、绑定

    2024年02月11日
    浏览(39)
  • VMware、linux虚拟机设置网络实现虚拟机与主机网络互通

    需要配置三个方面才可以,即:WMware网络配置、windows主机网络配置、虚拟机网络配置。 首先,确认VMware虚拟机设置中配置的网络连接方式为:NAT模式 可以在VMware上方的菜单中找到:虚拟机----设置,然后找到 网络适配器,设置网络连接方式为:NAT模式,如下图: 一、配置

    2024年02月02日
    浏览(43)
  • 根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

    目录 一、需求分析 1.1、对 Message Queue 的认识 1.2、消息队列核心概念 1.3、Broker Server 内部关键概念 1.4、Broker Server 核心 API (重点实现) 1.5、交换机类型 Direct 直接交换机 Fanout 扇出交换机 Topic 主题交换机 1.6、持久化 1.7、网络通信 通信流程 远程调用设计思想 1.8、模块设计图

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包