RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析

这篇具有很好参考价值的文章主要介绍了RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

前言

RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用,并给出怎么用的案例。

本篇博客结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码。(文末有惊喜~)

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式
其他相关的rabbitmq博客文章列表如下:

RabbitMQ基础(1)——生产者消费者模型 & RabbitMQ简介 & Docker版本的安装配置 & RabbitMQ的helloworld + 分模块构建 & 解决大量注册案例

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

RabbitMQ的Docker版本安装 + 延迟插件安装 & QQ邮箱和阿里云短信验证码的主题模式发送

引出


1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

MQ场景:

Message Queue 消息队列。

有比较大的负载,而且这些负载不用立刻马上给程序返回结构,可以有一些等待时间。可以用消息队列。

1异步处理

过去的项目,大部分都是同步解决问题。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

UserinfoService{
    register(){
        //典型的同步
        //插入数据库
        //发短信
        //发邮件
    }
}
//如果你的操作基本没有任何延时操作,或者瓶颈,没有压力。那么你没有必要用MQ
UserinfoService{
    register(){
        //发起的是一个异步请求
        都不等待对方给我返回的结果
        //异步插入数据库
        //异步发短信
        //异步发邮件
    }
}

2解耦

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

库存有没有可能挂了。或者访问量巨大。因为库存慢,导致订单也慢。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

解耦。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

3削峰填谷

双12的时候,叫好早上的8点。秒杀的时候。

有些瞬间服务器压力是超大的,过了这个瞬间,几乎没有消耗量。

等服务器能够正常的时候,我慢慢执行就行了。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

常见的MQ

MQ的前身,就是一个发布者订阅模式。

Kafka RabbitMQ(1W/s) RocketMQ ActiveMQ

Kafka : 10w/S 主要用于日志

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

拆解控制台页面

总览页面

包括刷新时间间隔,rabbitmq节点,连接端口的信息等

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

配置可以导出和导入

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

连接connection

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

队列页面

包括队列的名字,状态,准备好的消息数量,未确认的消息数量,总计消息数量

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

简单模式

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

使用的依赖

	<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
	<dependency>
	    <groupId>com.rabbitmq</groupId>
	    <artifactId>amqp-client</artifactId>
	    <version>5.7.3</version>
	</dependency>

工具类:建立连接

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

package com.tianju.config;


import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 建立连接的工具类
 */
public class ConnectionFactory {

    public static Connection createConnection() throws IOException, TimeoutException {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
        connectionFactory.setHost("192.168.111.130"); // http://192.168.111.130/
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        connectionFactory.setVirtualHost("/demo");

        // amqp://admin@192.168.111.130:5672/
        return connectionFactory.newConnection();
    }
}

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

生产者:生产消息

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

package com.tianju.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者发送消息
 * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
 */
public class Provider {
    private static String QUEUE_ORDER = "queue_order";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare(QUEUE_ORDER,false,false,false,null);

        // 发送消息,指定给哪个队列上发消息
        for (int i = 0; i < 100; i++) {
            String msg = "hello rabbitmq--"+i;
            channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());

            System.out.println("消息发布成功");

        }
        connection.close();
    }
}

消费者:消费消息

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

package com.tianju.simple;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static String QUEUE_ORDER = "queue_order";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare(QUEUE_ORDER,false,false,false,null);
        // 队列必须声明,如果不存在,则自动创建

        // 声明一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调函数,用来接收消息
             * @param consumerTag the <i>consumer tag</i> associated with the consumer
             * @param envelope packaging data for the message
             * @param properties content header data for the message
             * @param body the message body (opaque, client-specific byte array),字节数组
             * @throws IOException
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do

                String msg = new String(body); // 收到的信息
                System.out.println("消费者接收到:"+msg);
                try {
                    Thread.sleep(3000);// 模拟一个耗时操作
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        // 表明自己是消费者,接收消息
        /**
         * autoAck:自动确认设置成 true
         */
        channel.basicConsume(QUEUE_ORDER, true, defaultConsumer);
    }
}

进行测试

生产者发送100条消息
RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

消费者进行消费,假设突然之间服务宕机了,此时消费了4条消息,理论上还应该有96条消息

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

打开控制台页面查看,消息全部消失了,出现了数据丢失的情况

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

全部用默认值的情况下,如果发生异常,则消息全部丢失。

消费者一次性拿到了所有的消息。

Ack:项目中必须false

生产者 ==投递消息=》队列

消费者=接受==》队列

ack: false 必须要手工确认。

消费者接到这个消息的时候, 这个消息进入 unack状态。

1:手工确认。消息删除。完成

2: 没手工确认,断开连接或者超时。

3:这个消息重新进入ready状态。等待其他消费者进行消费。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

先设置自动ack为false,表示需要手工确认,然后在消费消息的方法中,进行消息的确认。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

package com.tianju.simple;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static String QUEUE_ORDER = "queue_order";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare(QUEUE_ORDER,false,false,false,null);
        // 队列必须声明,如果不存在,则自动创建

        // 声明一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调函数,用来接收消息
             * @param consumerTag the <i>consumer tag</i> associated with the consumer
             * @param envelope packaging data for the message
             * @param properties content header data for the message
             * @param body the message body (opaque, client-specific byte array),字节数组
             * @throws IOException
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                // no work to do

                String msg = new String(body); // 收到的信息
                System.out.println("消费者接收到:"+msg);
                try {
                    Thread.sleep(3000);// 模拟一个耗时操作
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // 消费者代码可能有失败,消息拿到之后,可能还没有处理,就宕机了
                // 消息确认的代码一定在最后一行
                // long deliveryTag 消息的下标;
                // boolean multiple 是否批量确认;
                channel.basicAck(envelope.getDeliveryTag(),true); // 确认消息,批量确认
            }
        };
        // 表明自己是消费者,接收消息
        /**
         * autoAck:自动确认设置成 true
         * 是否自动确认,
         * false:不进行自动确认;true:自动确认
         * 消费过程中可能产生异常
         * 如果产生异常,则必须进行消费补偿
         */
        channel.basicConsume(QUEUE_ORDER, false, defaultConsumer); // 接收消息
    }
}

再次模拟宕机的情况,一开始消息全部被放到Unack中,当宕机时,又把消息吐了出来,至少消息没有出现丢失的情况。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

工作模式

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

一个生产者,两个消费者

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

两个消费者消费的是同一个队列中的消息。

两个消费者上来后,默认是按照平均分配。结果: 有人瞬间干完。有人很久都没干完。

QOS: 限流

可以限制你一次拉取几个。

这样两个消费者,也能够节省服务器CPU了。

        // 创建频道
        Channel channel = connection.createChannel();
        channel.basicQos(1); /** 1次只能拿1个 **/

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

当消费的速度太慢,觉得不够,加入其他的消费者。

核心,只有一个消息队列。

消息是轮询的方式,发送给不同的消费者。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

Tips:队列参数怎么变?

队列的参数发生变化后,要删除再添加。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

        // 创建频道
        Channel channel = connection.createChannel();

        channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列

        boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在
        // 创建队列
        channel.queueDeclare(QUEUE_ORDER,durable,false,false,null);

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

队列相关参数

x-max-length

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

package com.tianju.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 生产者发送消息
 * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
 */
public class Provider {
    private static String QUEUE_ORDER = "queue_order";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列
        boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在

        Map map = new HashMap();
        map.put("x-max-length", 10); // 设置最大的长度,只接受10个
        // 创建队列
        channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);

        // 发送消息,指定给哪个队列上发消息
        for (int i = 0; i < 1000; i++) {
            String msg = "hello rabbitmq--"+i;
            channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());

            System.out.println("消息发布成功");
        }
        connection.close();
    }
}

How many (ready) messages a queue can contain before it starts to drop them from its head.

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

x-overflow

Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head, reject-publish or reject-publish-dlx. The quorum queue type only supports drop-head and reject-publish.

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式
此时就是前10个了

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

x-max-length-bytes

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

package com.tianju.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 生产者发送消息
 * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
 */
public class Provider {
    private static String QUEUE_ORDER = "queue_order";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列
        boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在

        Map map = new HashMap();
        map.put("x-max-length", 10); // 设置最大的长度,只接受10个
        map.put("x-overflow", "reject-publish"); // 拒绝发布,变成前10ge
        map.put("x-max-length-bytes", 4); // 消息的最大字节长度
        // 创建队列
        channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);

        // 发送消息,指定给哪个队列上发消息
        for (int i = 0; i < 20; i++) {
            String msg = "53"+i;
            channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());
            msg = "hello rabbitmq--"+i;
            channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());

            System.out.println("消息发布成功");
        }
        connection.close();
    }
}

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

参数

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

 		//创建队列
        boolean durable = true;//当前队列中的消息进行持久化操作  重启之后,消息还在
        Map map = new HashMap();
        map.put("x-max-length",10);//设置队列的最大长度
        map.put("x-overflow","reject-publish");//设置队列的最大长度  drop head
        map.put("x-max-length-bytes",4);//设置消息的最大字节长度
//        map.put("x-expires",10000);//超时后,直接删除队列
        channel.queueDeclare(QUQUENAME,durable,false,false,map);

发布者订阅模式

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

工作模式中,只有一个队列。两个消费者轮询从同一个队列中取数据。

发布者订阅模式: 交换机后面会绑定多个消息队列。每个消息队列都有完整的信息。每个队列后的消费者都会有完整的消息。

交换机:就是一个无意识的广播。行为就是扇出.

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

场景:

下订单:

1:订单的数据入数据库
2:发送订单的短信
3: 物流

要每个队列都有完整的信息。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

生产者:给fanout交换机发消息

package com.tianju.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者发送消息
 * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
 */
public class Provider {
    private static String QUEUE_ORDER = "queue_order";
    private static  String EXCHANGE = "pet_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个

        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE, "", MessageProperties.TEXT_PLAIN, ("hello fanout"+i).getBytes());
        }
    }
}

消费者1:队列q32

package com.tianju.publish;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static  String EXCHANGE = "pet_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个

        channel.queueDeclare("q32", false, false, false, null);
        channel.queueBind("q32", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的

        channel.basicConsume("q32", new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do
                String msg = new String(body);
                System.out.println("消费者1:"+msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

消费者2:队列q321

package com.tianju.publish;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    private static  String EXCHANGE = "pet_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个

        channel.queueDeclare("q321", false, false, false, null);
        channel.queueBind("q321", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的

        channel.basicConsume("q321", new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do
                String msg = new String(body);
                System.out.println("消费者2:"+msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

路由模式

允许用不同的路由键来接受不同的信息。

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

队列会接到完整的全部的信息。

日志系统:

日志级别:

OFF INFO DEBUG ERROR FATAL ALL

如果是致命错误: 立刻给运维发短信。

全部信息: 放到专门的日志系统
Error: 进行发邮件

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

生产者:发给交换机,告诉路由键

package com.tianju.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者发送消息
 * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
 */
public class Provider {
    private static String EXCHANGE = "exchange_58";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机


        String msg  = "this is fatal message";
        channel.basicPublish(EXCHANGE,"fatal", MessageProperties.TEXT_PLAIN,msg.getBytes());
        msg  = "this is error message";
        channel.basicPublish(EXCHANGE,"error", MessageProperties.TEXT_PLAIN,msg.getBytes());
        msg  = "this is debug message";
        channel.basicPublish(EXCHANGE,"debug", MessageProperties.TEXT_PLAIN,msg.getBytes());

    }
}

消费者1:根据路由键,接收3种消息

package com.tianju.routing;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.Calendar;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static String EXCHANGE = "exchange_58";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机
        // 队列必须声明,如果不存在,则自动创建

        channel.queueDeclare("q58", false, false, false, null);

        channel.queueBind("q58", EXCHANGE, "fatal");
        channel.queueBind("q58", EXCHANGE, "error");
        channel.queueBind("q58", EXCHANGE, "debug");

        channel.basicConsume("q58", new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do
                String msg = new String(body);
                System.out.println("消费者1:"+msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}

消费者2:根据路由键,接收1种消息

package com.tianju.routing;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    private static String EXCHANGE = "exchange_58";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机
        // 队列必须声明,如果不存在,则自动创建

        channel.queueDeclare("q581", false, false, false, null);

        channel.queueBind("q581", EXCHANGE, "fatal");

        channel.basicConsume("q581", new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do
                String msg = new String(body);
                System.out.println("消费者1:"+msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}

主题模式(Topic)

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

可以采取通配符模式:

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.

#`:匹配0个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

场景:

京东: 无锡,河南下订单。退货

==========总部 收到这个订单 .

无锡分销商也有这个消息 *.wuxi

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式

生产者:主题交换机,带路由键

package com.tianju.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者发送消息
 * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
 */
public class Provider {
    private static String EXCHANGE = "exchange_58";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "topic"); // 主题模式交换机


        String msg  = "this is wuxi order";
        channel.basicPublish(EXCHANGE,"order.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());
        msg  = "this is henan order";
        channel.basicPublish(EXCHANGE,"order.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());
        msg  = "this is wuxi back";
        channel.basicPublish(EXCHANGE,"back.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());
        msg  = "this is henan back";
        channel.basicPublish(EXCHANGE,"back.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());

    }
}

消费者1:通配符的路由键

package com.tianju.topic;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static String EXCHANGE = "exchange_58";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机
        // 队列必须声明,如果不存在,则自动创建

        channel.queueDeclare("q58", false, false, false, null);

        channel.queueBind("q58", EXCHANGE, "*.wuxi"); // 意味着无锡的订单和退货都能收到


        channel.basicConsume("q58", new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do
                String msg = new String(body);
                System.out.println("无锡仓库:"+msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}

消费者2:统配符

package com.tianju.topic;

import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    private static String EXCHANGE = "exchange_58";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立连接
        Connection connection = ConnectionFactory.createConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机
        // 队列必须声明,如果不存在,则自动创建

        channel.queueDeclare("q581", false, false, false, null);

        channel.queueBind("q581", EXCHANGE, "*.*"); // 意味着所有的订单和退货都能收到


        channel.basicConsume("q581", new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // no work to do
                String msg = new String(body);
                System.out.println("总部仓库:"+msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}


总结

1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析,# Message Queue,rabbitmq,ruby,分布式文章来源地址https://www.toymoban.com/news/detail-731153.html

到了这里,关于RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ(一) - 基本结构、SpringBoot整合RabbitMQ、工作队列、发布订阅、直接、主题交换机模式

    Publisher : 生产者 Queue: 存储消息的容器队列; Consumer:消费者 Connection:消费者与消息服务的TCP连接 Channel:信道,是TCP里面的虚拟连接。例如:电缆相当于TCP,信道是一条独立光纤束,一条TCP连接上创建多少条信道是没有限制的。TCP一旦打开,就会出AMQP信道。无论是发布消息

    2024年02月14日
    浏览(39)
  • RabbitMQ:概念和安装,简单模式,工作,发布确认,交换机,死信队列,延迟队列,发布确认高级,其它知识,集群

    1.1.1.什么是MQ MQ(message queue:消息队列) ,从字面意思上看,本质是个 队列 , FIFO 先入先出 ,只不过队列中存放的 内容是message 而已 ,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服

    2024年01月20日
    浏览(47)
  • RabbitMQ发布与订阅模式类型

    🍁博客主页:👉不会压弯的小飞侠 ✨欢迎关注:👉点赞👍收藏⭐留言✒ ✨系列专栏:👉Linux专栏 🔥欢迎大佬指正,一起学习!一起加油! 工作队列背后的假设是每个任务都是 只交付给一名工人。在这一部分中,我们将做一些事情 完全不同的 - 我们将向多个传递消息 消

    2024年02月02日
    浏览(31)
  • RabbitMQ入门案例之发布订阅模式

    本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。 官网文档地址:https://rabbitmq.com/getstarted.html RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。 在该模

    2024年02月09日
    浏览(47)
  • 【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)

    通过本篇博客能够简单使用RabbitMQ的发布订阅模式。 本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。 发布订阅模式的核心是生产者生产的消息,其他消费者都可以收到该生产者生产的消息。 由于发布订阅模式

    2024年02月02日
    浏览(27)
  • RabbitMQ--04--发布订阅模式 (fanout)-案例

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 @RabbitListener和@RabbitHandler的使用 OrderService OrderServiceImpl 在项目的test中发送请求 访问网址: http://localhost:15672/#/queues yml配置 SmsConsumerService、SmsConsumerServiceImpl EmailConsumerService、EmailConsumerServiceImpl DuanxinCo

    2024年04月14日
    浏览(34)
  • RabbitMQ的Publish/Subscribe发布订阅模式详解

    各位小伙伴很久不见了,今儿又要给大家分享干货了。我们知道RabbitMQ有简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、远程过程调用模式、发布者确认模式等。这么多模式,你可能一下子很难全部吸收,今天袁老师主要给大家介绍发布订阅模式Publish/Subsc

    2024年02月10日
    浏览(36)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(53)
  • Spring Boot整合RabbitMQ之发布与订阅模式

    RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现: 用户发布动态,其“粉丝”收到其发布动态的消息 用户下订单,库存模块、支付模块等收到消息并

    2024年02月12日
    浏览(27)
  • RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月10日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包