RabbitMQ学习笔记(尚硅谷)

这篇具有很好参考价值的文章主要介绍了RabbitMQ学习笔记(尚硅谷)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一. 消息队列

1. 定义

RabbitMQ学习笔记(尚硅谷)

2. 作用

2.1 流量消峰

RabbitMQ学习笔记(尚硅谷)

2.2 应用解耦

RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

2.3 异步处理

RabbitMQ学习笔记(尚硅谷)

3. 分类

RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

4. MQ的选择

RabbitMQ学习笔记(尚硅谷)
大量数据:Kafaka;高并发:RocketMQ; 中小型数据量少:RabbitMQ

5. RabbitMQ

5.1 概念

快递站
RabbitMQ学习笔记(尚硅谷)

5.2 四大概念

生产者,消费者,交换机,队列
交换机可以对应多个队列

RabbitMQ学习笔记(尚硅谷)

5.3 六大模式

简单模式
工作模式
发布/订阅模式
路由模式
主题模式
发布确定模式
RabbitMQ学习笔记(尚硅谷)

5.4 RabbitMQ 工作原理

RabbitMQ学习笔记(尚硅谷)

5.5 安装

密码:123456
用户名:admin

6. 代码实现

有基于SpringBoot的代码 实现起来比较简单,参考资源下载。

二. Hello World (简单模式)

RabbitMQ学习笔记(尚硅谷)

1. 生产者代码

  • 导入依赖
    RabbitMQ学习笔记(尚硅谷)
  • 代码
    RabbitMQ学习笔记(尚硅谷)
    密码改成自己的

2. 消费者代码

package com.chent;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static final String QUEUE_NAME = "HELLO";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory  = new ConnectionFactory();
        factory.setHost("192.168.86.130");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("等待消息被消费..");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println("消费被中断");
        };
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

三. Work Queues (工作队列模式)

RabbitMQ学习笔记(尚硅谷)

1. 轮询分发消息

  • 直接用idea启动两个线程的技巧
    右上角run旁边edict configuration 然后Build and Run最后 modify option选项 选择 allow mutiple instances

RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

2. 消息应答

2.1 概念

RabbitMQ学习笔记(尚硅谷)

2.2 自动应答

RabbitMQ学习笔记(尚硅谷)
不靠谱

2.3 手动应答

RabbitMQ学习笔记(尚硅谷)

  • 参数Multiple的解释
    RabbitMQ学习笔记(尚硅谷)
  • 代码实现
    RabbitMQ学习笔记(尚硅谷)

2.3 消息应答重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认, RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
RabbitMQ学习笔记(尚硅谷)

默认的自动应答会使消息丢失(自动应答就是发送消息后即认为成功发射),要想实现消息不丢失,必须采用手动应答

3. RabbitMQ持久化

3.1 队列持久化

  • 注意事项
    已有消息队列改成持久化,必须先删除

3.2 消息持久化

RabbitMQ学习笔记(尚硅谷)

3.3 不公平分发

轮训分发-公平分发,但是不合理
RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

3.4 预取值

限制缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
通过使用 basic.qos 方法设置“预取计数” 值来完成的。 该值定义通道上允许的未确认消息的最大数量
RabbitMQ学习笔记(尚硅谷)

四. 发布确认

1. 原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID( 从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了**,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。**
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

2. 发布确认的策略

2.1 开启发布确认的方法

RabbitMQ学习笔记(尚硅谷)
**confirmSelect()**开启发布

2.2 单个确认发布

发布速度特别的慢
channel.waitForConfirms(): 确认订阅成功
RabbitMQ学习笔记(尚硅谷)

2.3 批量确认发布

先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
RabbitMQ学习笔记(尚硅谷)

2.4 异步确认发布 (内置了一个监听器)

RabbitMQ学习笔记(尚硅谷)
一个监听器+两个回调函数(一个成功,一个失败返回)
RabbitMQ学习笔记(尚硅谷)
完整代码:channel.addConfirmListener(成功返回函数,失败返回函数)
RabbitMQ学习笔记(尚硅谷)

2.5 确认未处理的消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

package com.chent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布订阅模式:异步发布确认
 */
public class Publish {
    public static void main(String[] args) throws Exception {
        Publish.publishAsync();
    }
    public static void publishAsync() throws Exception{
        ConnectionFactory factory  = new ConnectionFactory();
        factory.setHost("192.168.86.130");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String queueName = "publishTest";
        channel.queueDeclare(queueName,true,false,false,null);//申明队列
        channel.confirmSelect();//开启发布确认
        //异步发布的逻辑
        //线程安全有序hashmap 适用于高并发情况
        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
        //消息成功确认回调函数
        ConfirmCallback ackCallback = (var1,var3)->{
            //2.删除确认的消息
            if(var3){//VAR3表示处理批量情况
            ConcurrentNavigableMap<Long, String> confirmMessage = map.headMap(var1);
            confirmMessage.clear();}
            else{
                map.remove(var1);
            }
            System.out.println("成功确认:"+ var1);
        };
        //消息失败确认回调函数
        ConfirmCallback nackCallback = (var1,var3)->{
            //3.打印未确认的消息
            String message = map.get(var1);
            System.out.println("未确认的消息是:"+message);
        };
        channel.addConfirmListener(ackCallback,nackCallback);
        long begin = System.currentTimeMillis();
        for(int i = 0;i<1000;i++){
            String message = "消息" + i;
            channel.basicPublish("",queueName,null,message.getBytes());
            //1.记录所有要发送的消息
            map.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();
        System.out.println("===========================================异步耗时:" + (end-begin)+"============================");
    }
}

2.6 对比

RabbitMQ学习笔记(尚硅谷)

五. 交换机

在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为 ”发布/订阅
RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

1. 概述

  • 作用
    RabbitMQ 消息传递模型的核心思想是**: 生产者生产的消息从不会直接发送到队列**。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
    相反, 生产者只能将消息发送到交换机(exchange),**交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。**交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定
  • 交换机类型
    直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
  • 无名Exchange
    RabbitMQ学习笔记(尚硅谷)
  • 临时队列
    RabbitMQ学习笔记(尚硅谷)
  • 绑定Bind
    RabbitMQ学习笔记(尚硅谷)

2. fanout扇形交换机

将接收到的所有消息广播到它知道的所有队列中

  • 消费者代码
public class ReceiveLogs01 {
	private static final String EXCHANGE_NAME = "logs";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		// 1. 声明fanout交换机
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		/**
		* 生成一个临时的队列 队列的名称是随机的
		* 当消费者断开和该队列的连接时 队列自动删除
		*/
		//2. 声明一个临时队列
		String queueName = channel.queueDeclare().getQueue();
		//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
		//3. 绑定交换机和队列的关系
		channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
		DeliverCallback deliverCallback = (consumerTag, delivery) ->
		{String message = new String(delivery.getBody(), "UTF-8");
		System.out.println("控制台打印接收到的消息"+message);
		};
		channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
		}
	}
  • 生产者代码
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
	try (Channel channel = RabbitUtils.getChannel()) {
	/**
	* 声明一个 exchange
	* 1.exchange 的名称
	* 2.exchange 的类型
	*/
	channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	Scanner sc = new Scanner(System.in);
	System.out.println("请输入信息");
	while (sc.hasNext()) {
	String message = sc.nextLine();
	channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
	System.out.println("生产者发出消息" + message);
	}
	}
	}
}

3. direct直接交换机

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
队列只对它绑定的交换机的消息感兴趣。绑定用参数: routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。
RabbitMQ学习笔记(尚硅谷)

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

4. Topic主题交换机

  • 引入
    在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
    尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型
  • 命名规范
    RabbitMQ学习笔记(尚硅谷)
    RabbitMQ学习笔记(尚硅谷)

5. 自我总结

交换机和队列绑定后,可以指定消息发给对应的队列
扇形交换机会给全部绑定的队列发消息,routingKey为空
直接交换机会给对应RoutingKey的队列发消息
主题交换机会给对应类型的routingKey(在直接交换机基础上改进)的队列发消息
说白了三种交换机其实就是在改变routingKey参数

channel.exchangeDeclare(交换机名称,类型);
channel.queueBind(队列名称,交换机名称, routingKey);

六. 死信队列

1. 死信的概念

  • 概念
    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了, consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

  • 应用场景:
    为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

2. 死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3. 实战

  • 代码架构图

RabbitMQ学习笔记(尚硅谷)

  • 模拟TTL过期
  • 生产者代码
    在生产者中设置过期时间
public class Producer {
	private static final String NORMAL_EXCHANGE = "normal_exchange";
		public static void main(String[] argv) throws Exception {
			try (Channel channel = RabbitMqUtils.getChannel())
			{ channel.exchangeDeclare(NORMAL_EXCHANGE,
			BuiltinExchangeType.DIRECT);
			//设置消息的 TTL 时间
			AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
			//该信息是用作演示队列个数限制
			for (int i = 1; i <11 ; i++) {
				String message="info"+i;
				channel.basicPublish(NORMAL_EXCHANGE,
				message.getBytes());
				System.out.println("生产者发送消息:"+message);
			}
		}
	}
}
  • 消费者C1代码
    声明普通/死信队列和交换机,绑定routingKey关系;
    核心部分:普通队列通过声明队列的参数 param绑定死信交换机的关系
public class Consumer01 {
	//普通交换机名称
	private static final String NORMAL_EXCHANGE = "normal_exchange";
	//死信交换机名称
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel()//声明死信和普通交换机 类型为 direct
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		//声明死信队列
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		//死信队列绑定死信交换机与 routingkey
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		//正常队列绑定死信队列信息
		Map<String, Object> params = new HashMap<>();
		//正常队列设置死信交换机 参数 key 是固定值
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//正常队列设置死信 routing-key 参数 key 是固定值
		params.put("x-dead-letter-routing-key", "lisi");
		String normalQueue = "normal-queue";
		channel.queueDeclare(normalQueue, false, false, false, params);
		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
		System.out.println("等待接收消息........... ");
		DeliverCallback deliverCallback = (consumerTag, delivery) ->
		{String message = new String(delivery.getBody(), "UTF-8");
		System.out.println("Consumer01 接收到消息"+message);
		};
		channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
	}
}
  • 消费者C2代码
    只是简单的让C2消费死信
public class Consumer02 {
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		System.out.println("等待接收死信队列消息........... ");
		DeliverCallback deliverCallback = (consumerTag, delivery) ->
		{String message = new String(delivery.getBody(), "UTF-8");
		System.out.println("Consumer02 接收死信队列的消息" + message);
		};
		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
		});
	}
}
  • 队列达到最长长度
    如何设置队列长度:在队列声明中添加参数
    RabbitMQ学习笔记(尚硅谷)

七. 延迟队列

1. 概念及应用 ***

RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

2. RabbitMQ中的两种TTL

RabbitMQ学习笔记(尚硅谷)
RabbitMQ学习笔记(尚硅谷)

3. 延时队列基础模型(基于死信队列)

  • 定义
    RabbitMQ学习笔记(尚硅谷)

  • 代码见资源下载(基于SpringBoot来实现)

  • 代码架构
    RabbitMQ学习笔记(尚硅谷)

  • 效果展示
    RabbitMQ学习笔记(尚硅谷)

  • 存在的问题
    如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

4. 优化(基于死信队列)

  • 架构
    RabbitMQ学习笔记(尚硅谷)
    新建一个队列,但是不设置TTL;在生产者端设置TTL即可

  • 效果展示
    RabbitMQ学习笔记(尚硅谷)

  • 缺点
    看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

5. 插件实现延迟队列

  • 效果展示
    RabbitMQ学习笔记(尚硅谷)

  • 实现步骤

  • 安装插件
    RabbitMQ学习笔记(尚硅谷)

  • 架构图及代码
    RabbitMQ学习笔记(尚硅谷)
    RabbitMQ学习笔记(尚硅谷)

  • 延时队列的其他选择
    当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

八. 发布确认高级

1. 基于SpringBoot的基本代码及存在问题

RabbitMQ学习笔记(尚硅谷)

RabbitMQ学习笔记(尚硅谷)
交换机发出了确认回调,但实际上队列没有收到消息

2. 回退消息

  • 概念:
    在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
  • 核心代码
    RabbitMQ学习笔记(尚硅谷)
  • 效果展示
    RabbitMQ学习笔记(尚硅谷)

3. 备份交换机

  • 概念
    备份交换机可以理解为 RabbitMQ 中交换机的“备胎” ,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
  • 代码框架
    RabbitMQ学习笔记(尚硅谷)
  • 代码实现
@Configuration
public class ConfirmConfig {
	public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
	public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
	public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
	public static final String BACKUP_QUEUE_NAME = "backup.queue";
	public static final String WARNING_QUEUE_NAME = "warning.queue";
	// 声明确认队列
	@Bean("confirmQueue")
	public Queue confirmQueue(){
	return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
	}
	//声明确认队列绑定关系
	@Bean
	public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
	@Qualifier("confirmExchange") DirectExchange exchange){
	return BindingBuilder.bind(queue).to(exchange).with("key1");
	}
	//声明备份 Exchange
	@Bean("backupExchange")
	public FanoutExchange backupExchange(){
	return new FanoutExchange(BACKUP_EXCHANGE_NAME);
	}
	//声明确认 Exchange 交换机的备份交换机
	@Bean("confirmExchange")
	public DirectExchange
	confirmExchange(){ExchangeBuilder
	exchangeBuilder =
	ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
	.durable(true)
	//设置该交换机的备份交换机
	.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
	return (DirectExchange)exchangeBuilder.build();
	}
	// 声明警告队列
	@Bean("warningQueue")
	public Queue warningQueue(){
	return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
	}
	// 声明报警队列绑定关系
	@Bean
	public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
	@Qualifier("backupExchange") FanoutExchange
	backupExchange){
	return BindingBuilder.bind(queue).to(backupExchange);
	}
	// 声明备份队列
	@Bean("backQueue")
	public Queue backQueue(){
	return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
	}
	// 声明备份队列绑定关系
	@Bean
	public Binding backupBinding(@Qualifier("backQueue") Queue queue,
	@Qualifier("backupExchange") FanoutExchange backupExchange){
	return BindingBuilder.bind(queue).to(backupExchange);
	}
}

九. RabbitMQ补充知识 *****

1. 幂等性

1.1 概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

1.2 消息重复消费

  • 问题描述
    消费者在消费 MQ 中的消息时, MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断,故 MQ 未收到确认信息该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
  • 解决方案
    MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

1.3 消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性。

  • 方案一:唯一ID+指纹码机制
    指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
  • 方案二:Redis原子性
    利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

2. 优先队列

  • 应用场景
    RabbitMQ学习笔记(尚硅谷)
  • 实现原理
    RabbitMQ学习笔记(尚硅谷)
  • 代码实现
public class Producer {
	private static final String QUEUE_NAME="hello";
	public static void main(String[] args) throws Exception {
		try (Channel channel = RabbitMqUtils.getChannel();) {
		//给消息赋予一个 priority 属性
		AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();		
		for (int i = 1; i <11; i++){
			String message = "info"+i;
			if(i==5){
			channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
			}else{
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			}
			System.out.println("发送消息完成:" + message);
			}
		}
	}
}
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
{Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消费者启动等待消费..............");
DeliverCallback deliverCallback=(consumerTag, delivery)-
>{ String receivedMessage = new
String(delivery.getBody());System.out.println("接收到消
息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)-
>{System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}

3.惰性队列

  • 定义
    惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。
  • 应用场景
    当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
    RabbitMQ学习笔记(尚硅谷)
  • 两种设置模式

RabbitMQ学习笔记(尚硅谷)

  • 内存开销对比
    RabbitMQ学习笔记(尚硅谷)

十. RabbitMQ集群

1. 搭建

2. 镜像队列

RabbitMQ学习笔记(尚硅谷)

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

3. Haproxy + Keepalive实现高可用负载均衡

RabbitMQ学习笔记(尚硅谷)
Haproxy 实现负载均衡
Keepalived 实现双机(主备)热备

4. Federation Exchange/Queue

RabbitMQ学习笔记(尚硅谷)

RabbitMQ学习笔记(尚硅谷)

5. Shovel

RabbitMQ学习笔记(尚硅谷)文章来源地址https://www.toymoban.com/news/detail-500664.html

到了这里,关于RabbitMQ学习笔记(尚硅谷)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【SpringBoot笔记29】SpringBoot集成RabbitMQ消息队列

    这篇文章,主要介绍SpringBoot如何集成RabbitMQ消息队列。 目录 一、集成RabbitMQ 1.1、引入amqp依赖 1.2、添加连接信息 1.3、添加RabbitMQ配置类

    2023年04月08日
    浏览(43)
  • RabbitMQ消息队列__学习报告

    一、Linux部署RabbitMQ (一)环境 1.rabbitmq1:192.168.163.128 [root@rabbitmq1 ~]# hostnamectl set-hostname client 2.rabbitmq2:192.168.163.132 [root@rabbitmq2 ~]# hostnamectl set-hostname haproxy 3.rabbitmq3:192.168.163.135 [root@rabbitmq3~]#  hostnamectl set-hostname rabbitmq3 4.域名解析 [root@localhost ~]# vim /etc/hosts 192.168.163.128

    2023年04月25日
    浏览(24)
  • 【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

    这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。 目录 一、防止消息丢失 1.1、消息确认机制(生产者) (1)生产者丢失消息 (2)生产者消息确认机制 1.2、消息确认机制(消费者) (1)消费者丢失消息

    2024年02月02日
    浏览(44)
  • GoLong的学习之路,进阶,RabbitMQ (消息队列)

    快有一周没有写博客了。前面几天正在做项目。正好,项目中需要MQ(消息队列),这里我就补充一下我对mq的理解。其实在学习java中的时候,自己也仿照RabbitMQ自己实现了一个单机的mq,但是mq其中一个特点也就是,分布式我在项目中没有涉及。这里我用go语言将RabbitMQ的操作

    2024年02月03日
    浏览(40)
  • 消息队列学习笔记

    异步处理:处理完关键步骤后直接返回结果,后续放入队列慢慢处理 流量控制: 使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。能根据下游的处理能力自动调节流量,达到“削峰填谷”的作用 网关在收到请求后,将请求放入请求消息队列; 后端

    2024年02月12日
    浏览(32)
  • 【学习笔记】Java——消息队列kafka

    1、Kafka combines three key capabilities: To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems. To store streams of events durably and reliably for as long as you want. To process streams of events as they occur or retrospectively. And all this functionality is provided in a distr

    2024年02月09日
    浏览(33)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(49)
  • 【学习日记2023.6.19】 之 RabbitMQ服务异步通信_消息可靠性_死信交换机_惰性队列_MQ集群

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收

    2024年02月11日
    浏览(35)
  • RabbitMq消息模型-队列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 队列消息特点: 消息不会丢失 并且 有先进先出的顺序。 消息接收是有顺序的,不是随机的,仅有一个消费者能拿到数据,而且不同消费者拿不到同一份数据。 基本模型: SimpleQueue 在上图的模型中,有以下几个概念: P:为生产

    2024年02月09日
    浏览(32)
  • 【RabbitMQ】消息队列-RabbitMQ篇章

    RabbitMQ是一个开源的 遵循AMQP协议 实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中 存储消息,转发消息 ,具有 高可用 , 高可扩性 , 易用性 等特征。 1.1、RabbitMQ—使用场景 一般场景 像一般的下订单业务如下图: 将订单信息写入数据库成功后,发

    2024年02月12日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包