【RabbitMQ(day2)】默认(直连)交换机的应用

这篇具有很好参考价值的文章主要介绍了【RabbitMQ(day2)】默认(直连)交换机的应用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

这篇博客是以下资料学后的总结:
不良人的RabbitMQ的教学视频
官方启动教程
RabbitMQ中文文档

一、第一种模型(Hello World)

【RabbitMQ(day2)】默认(直连)交换机的应用,RabbitMQ,rabbitmq,ruby,分布式

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序;
  • C:消费者:消息的接受者,会一直等待消息的到来。
  • queue:消息队列,图中红色部分。类似于一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  1. 开发生产者
		// 创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置连接Rabbitmq的主机
        connectionFactory.setHost("192.168.248.135");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        // 设置用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");

        // 获取连接对象
        Connection connection = connectionFactory.newConnection();

        // 获取连接中的通道对象
        Channel channel = connection.createChannel();

        // 通过通道绑定对应的消息队列
        // 参数1:队列的名称  如果队列不存在会自动创建
        // 参数2:用来定义队列特性是否需要持久化,true:持久化队列,false即不持久化
        // 参数3:exclusive 是否独占队列
        // 参数4:是否在消费完成后自动删除队列
        // 参数5:额外参数
       // 这个不加是没关系的,只是表示我的Rabbitmq中是有hello消息队列的,消费者产生的
channel.queueDeclare("hello",false,false,false,null);

        // 发布消息
        // 参数1:交换机名称;参数2:路由键名称;参数3:传递消息额外设置;参数4:消息的具体内容
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

        channel.close();
        connection.close();
  1. 消费者开始消费
 		// 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机和端口
        connectionFactory.setHost("192.168.248.135");
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        // 设置用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");

        // 创建连接对象
        Connection connection = connectionFactory.newConnection();

        // 创建通道
        Channel channel = connection.createChannel();
        // 通道绑定对象
        channel.queueDeclare("hello",false,false,false,null);

        // 消费消息
        // 参数1:消费哪个队列的消息  队列名称
        // 参数2:开始消息的自动确认机制
        // 参数3:消费消息时的回调接口
        String hello = channel.basicConsume("hello", true, new DefaultConsumer(channel){
            // 最后一个参数:消息队列中取出的消息
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("转化成对应的字符串: " + new String(body));
                System.out.println("============");
            }
        });
        /*channel.close();
        connection.close();*/

【RabbitMQ(day2)】默认(直连)交换机的应用,RabbitMQ,rabbitmq,ruby,分布式

这里需要注意,由于这里是多线程下生产、消费消息,所以在消费时不应该提前关闭通道,不然无法监听到队列中的数据。

下面是证明,看看各线程的名称就知道了。

【RabbitMQ(day2)】默认(直连)交换机的应用,RabbitMQ,rabbitmq,ruby,分布式

需注意:Junit5之前是不支持多线程的。

  1. 参数说明
channel.queueDeclare("hello",true,false,true,null);
"参数1":用来声明通道对应的队列;
"参数2":用来指定是否持久化队列
"参数3":用来指定是否独占队列,一般为false
"参数4":用来指定是否自动删除队列
"参数5":对队列的额外配置
    
	参数1:交换机名称;
    参数2:路由键名称;
    参数3:传递消息额外设置;
    参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"I love you~".getBytes());

二、第二种模型(work queue)

Work Queues,也被称为(Task Queues 任务模型)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

【RabbitMQ(day2)】默认(直连)交换机的应用,RabbitMQ,rabbitmq,ruby,分布式

角色:

  • Sender:生产者:任务的发布者
  • Consumer:消费者,领取任务并且完成任务

生产者代码

        // 获取连接对象
        Connection conn = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = conn.createChannel();

        // 通过通道声明队列
        channel.queueDeclare("work", true, false, false, null);

        for (int i = 0; i < 10; ++i) {
            // 生产消息
            channel.basicPublish("", "work", null, ("(" + i + ")Hello worke queue~").getBytes());
        }
        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(conn, channel);

消费者代码

Runnable myRunnable = new Runnable() {
            @Override
            public void run() {
                Connection conn = RabbitMQUtils.getConnection();
                try {
                    final int[] cnt = new int[1];
                    cnt[0] = 0;
                    Channel channel = conn.createChannel();
                    channel.queueDeclare("work", true, false, false, null);
                    System.out.println("当前线程:" + Thread.currentThread().getName());
                    channel.basicConsume("work", true, new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
                            System.out.println("消费者-" + consumerTag + ":" + new String(body));
                            System.out.println("================================================");
                            cnt[0]++;
                        }
                    });
                    System.out.println(Thread.currentThread().getName() + ":" + cnt[0]);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        Thread work1 = new Thread(myRunnable, "work-001");
        Thread work2 = new Thread(myRunnable, "work-002");
        Thread work3 = new Thread(myRunnable, "work-003");
        work1.start();
        work2.start();
        work3.start();

测试结果

【RabbitMQ(day2)】默认(直连)交换机的应用,RabbitMQ,rabbitmq,ruby,分布式

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。如下图所示:

【RabbitMQ(day2)】默认(直连)交换机的应用,RabbitMQ,rabbitmq,ruby,分布式

下面是官方给的

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

自动确认机制的后果和公平分配

channel.basicConsume("work", true, new DefaultConsumer(channel)
// 这里的第二个参数是指是否默认提交

上面 work queue 实现会出现两个问题:

问题一:这里将 autoAcked 参数值设置为了 true,即消费者收到消息队列调度的消息后不管有没有消费成功都立即返回 ACK 确认,消息队列只顾着轮询分配去了。这个时候的话会引发一个问题:当消费者突然宕机了,那还没处理的消息就不会被处理,即消失了。比如一个消费者被分配到了五个消息,但是只处理了三个就嘎了,那剩下的俩个就处理不了了。

问题二:在两个worker的情况下,当所有奇数消息都很重,偶数消息都很轻时,一个worker将一直很忙,而另一个几乎不做任何工作。但是RabbitMQ对此一无所知,仍然会均匀地分发消息。我们应该遵循能者多劳,充分利用资源,但轮询方式总是这么的不合我们的胃口。

解决方案

  • 首先得将自动提交设置为 false,手动提交就好了;

  • 每一次给空闲发消费者一个消息,即设置 prefetchCount = 1,这样的话不会让能者出现不工作,懒者一堆事没做的情况。当消费者死亡(即通道关闭、连接被关闭、或者TCP连接丢失等情况)还没有发送ACK,那有其他消费者在线的话,消息队列会将消息迅速交付给另一个消费者,从而确保消息没有丢失。

    具体解决方案的伪代码如下:

// 配置每一次只能执行一个小希
channel.basicQos(1);
// 关闭手动提交
channel.basicConsume("work", false, new DefaultConsumer(channel) {
       @Override
       public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
        System.out.println("消费者-" + consumerTag + ":" + new String(body));

         // 参数1:确认队列中哪些具体消息 参数2:是否开启多个消息同时确认
                     channel.basicAck(envelope.getDeliveryTag(), false);
                        }
});

三、阐述默认交换机

可以在 RabbitMQ中文文档-默认交换机 去了解更多AMQP协议的一些内容。

默认交换机的本质是直连交换机,当你添加一个队列的时候,这个队列第一反应就是绑定默认交换机,而绑定(binding)的路由键名称和队列名称是一致的。

上面两种模型(RabbitMQ官方教程阐述的)Hello World模型和Work Queues模型,在官方教程中没有指出使用了交换机,但是本质都是绑定了默认交换机的,也就是直连交换机,它也是支持多消费者的负载均衡的。

首先必须知道的是:使用默认交换机时,队列是在消费者端创建的(可以说是用户本身吧),而不是生产者去创建的。当生产者发送一条消息到 RabbitMQ 时,RabbitMQ 会根据消息的路由键(在使用默认交换机的情况下,路由键即为队列名称)来查找是否已经存在该队列,如果队列不存在,则会丢弃该消息。

basicPublish 方法的第二个参数为路由键名称和 basicConsume 方法的第一个参数为队列名称也是可以看出来的。

总的来说就是生产者不需要关心队列的创建,这是消费者需要声明指定的,默认交换机会绑定声明的消息队列的,所以生产者该发发,创建的任务就不用管了。文章来源地址https://www.toymoban.com/news/detail-617151.html

到了这里,关于【RabbitMQ(day2)】默认(直连)交换机的应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ 备份交换机和死信交换机

      为处理生产者将消息推送到交换机中,交换机按照消息中的路由键及自身策略无法将消息投递到指定队列中造成消息丢失的问题,可以使用备份交换机。   为处理在消息队列中到达TTL的过期消息,可采用死信交换机进行消息转存。可以通过死信交换机的方式实现延迟队

    2024年02月14日
    浏览(49)
  • 【北京迅为】《iTOP-3588开发板网络环境配置手册》第2章 电脑、开发板直连交换机或路由器

    RK3588是一款低功耗、高性能的处理器,适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用,RK3588支持8K视频编解码,内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP,内置NPU,支持INT4/INT8/INT16/FP16混合运算能力

    2024年03月26日
    浏览(57)
  • RabbitMQ之Exchange(交换机)属性及备用交换机解读

    目录 基本介绍 主要结论 备用交换机  springboot代码实战(备用交换机) 实战架构 工程概述 RabbitConfigDeal 配置类:创建队列及交换机并进行绑定  MessageService业务类:发送消息及接收消息 主启动类RabbitMq01Application:实现ApplicationRunner接口 在 RabbitMQ 中,交换机主要用来将生产

    2024年02月02日
    浏览(49)
  • 【RabbitMQ】RabbitMQ的交换机

    在上文中, 都没有交换机,生产者直接发送消息到队列。 而一旦引入交换机,消息发送的模式会有很大变化:可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化: Publisher:生产者,不再发送消息到队列中,而是发给交换机 Exchange:交换机,一方面,接收生

    2024年03月12日
    浏览(54)
  • 02、RabbitMQ交换机

    目录 1.、Exchange(交换机)的作用  2、Exchange(交换机)的类型 2.1.直连交换机:Direct Exchange 2.2.主题交换机:Topic Exchange 2.3.扇形交换机:Fanout Exchange 2.4.首部交换机:Headers exchange 2.5.默认交换机 2.6.Dead Letter Exchange(死信交换机) 3、交换机的属性 4、综合案例:交换机的使用 给子模

    2024年02月04日
    浏览(57)
  • RabbitMQ交换机(1)

    RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange), 交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将

    2024年01月17日
    浏览(43)
  • RabbitMQ-交换机类型

    RabbitMQ的交换机类型总共有四种 1-直连交换机(Direct exchange) 消息发送到此交换机上时,交换机会将此消息发送到RoutingKey和消息中RoutingKey完全匹配的的队列(如果匹配了多个队列,则每个队列都会收到相同的消息)。 2-扇形交换机(Fanout exchange) 这个交换机机,会将收到的

    2023年04月08日
    浏览(39)
  • RabbitMQ交换机

    (1)接收publisher发送消息 (2)将消息按照规则路由到与之绑定的队列 (1)Fanout(广播) Fanout Exchange会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式 (2)Direct(定向) Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由 每一个

    2024年01月19日
    浏览(44)
  • RabbitMQ交换机类型

    先附加下官网文档。RabbitMQ的交换机类型共有四种,是根据其路由过程的不同而划分成的: 直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机,并且具备了选择性接收消息的

    2024年02月06日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包