引出
1.rabbitmq队列方式的梳理,点对点,一对多;
2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;
3.topic模式,根据规则决定发送给哪个队列;
4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;
5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;文章来源地址https://www.toymoban.com/news/detail-686853.html
点对点(simple)
点对对方式传输
Work queues 一对多
1个生产者多个消费者
发布订阅/fanout模式
生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到。
以登陆验证码为例
pom文件导包
<!-- qq邮箱-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!-- 阿里云短信验证码相关包-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.3</version>
</dependency>
<!-- queue的包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml文件
server:
port: 9099
spring:
# 模块的名字
application:
name: user-auth
# 邮箱的配置
mail:
host: smtp.qq.com
port: 587
username: xxxx
password: xxxxx
# rabbitmq的配置
rabbitmq:
host: 192.168.111.130
port: 5672
username: admin
password: 123
logging:
level:
com.tianju.auth: debug
rabbitmq的配置
需要用到的常量
package com.tianju.auth.util;
/**
* rabbitmq的常量
*/
public interface RabbitMqConstants {
String MQ_MAIL_QUEUE="mq_email_queue";
String MQ_PHONE_QUEUE="mq_phone_queue";
String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";
// 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
}
RabbitMqConfig.java配置
邮箱队列,电话队列,交换机;
邮箱绑定交换机,电话绑定交换机;
创建队列参数说明:
参数 | 说明 |
---|---|
name | 字符串值,queue的名称。 |
durable | 布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。 |
exclusive | 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。 |
autoDelete | 布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。 |
不指定 durable、exclusive 和 autoDelete 时,默认为 true 、 false 和 false 。表示持久化、非排它、不用自动删除。
创建交换机参数说明
参数 | 说明 |
---|---|
name | 字符串值,exchange 的名称。 |
durable | 布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。 |
autoDelete | 布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。 |
不指定 durable 和 autoDelete 时,默认为
true
和false
。表示持久化、不用自动删除
package com.tianju.auth.config;
import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean // 邮箱的队列
public Queue mailQueue(){
return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
@Bean // 电话的队列
public Queue phoneQueue(){
return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
@Bean // 交换机
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
RabbitMqConstants.durable,
RabbitMqConstants.autoDelete);
}
@Bean
public Binding mailBinding(){
return BindingBuilder.bind(mailQueue())
.to(fanoutExchange());
}
@Bean
public Binding phoneBinding(){
return BindingBuilder.bind(phoneQueue())
.to(fanoutExchange());
}
}
生产者生成验证码,发送给交换机
接口
package com.tianju.auth.service;
public interface IUserService {
/**
* 生产者生成信息发送给交换机
* @param msg 信息,这里是验证码
*/
void sendCode(String msg);
}
实现
package com.tianju.auth.service.impl;
import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class UserServiceImpl implements IUserService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendCode(String msg) {
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_FANOUT_EXCHANGE,
"routingkey.fanout",
msg);
log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
}
}
测试类生成验证码,发给交换机
package com.tianju.auth.service.impl;
import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class UserServiceImplTest {
@Autowired
private IUserService userService;
@Test
public void sendCode() {
String code = new Snowflake().nextIdStr().substring(0, 6);
System.out.println(code);
userService.sendCode(code);
}
}
消费者消费验证码
package com.tianju.auth.consumer;
import com.tianju.auth.service.IEmailService;
import com.tianju.auth.util.RabbitMqConstants;
import com.tianju.auth.util.SMSUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class UserConsumer {
@Autowired
private IEmailService emailService;
@RabbitListener(queues = RabbitMqConstants.MQ_MAIL_QUEUE)
public void emailConsumer(String msg){
log.debug("[email消费者:]消费{}",msg);
emailService.sendEmail("xxxx@qq.com", "登陆验证码", msg);
}
@RabbitListener(queues = RabbitMqConstants.MQ_PHONE_QUEUE)
public void phoneConsumer(String msg){
log.debug("[phone消费者:]消费{}",msg);
SMSUtil.send("xxxx", msg);
}
}
topic模式
例如: routingkey: my.orange.rabbit —-> Q1,Q2
配置类增加配置
package com.tianju.auth.util;
/**
* rabbitmq的常量
*/
public interface RabbitMqConstants {
String MQ_MAIL_QUEUE="mq_email_queue";
String MQ_PHONE_QUEUE="mq_phone_queue";
String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";
String MQ_TOPIC_EXCHANGE="mq_topic_exchange";
String MQ_TOPIC_QUEUE_A = "mq_topic_queue_a";
String MQ_TOPIC_QUEUE_B = "mq_topic_queue_b";
// 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
}
package com.tianju.auth.config;
import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean // 邮箱的队列
public Queue mailQueue(){
return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
@Bean // 电话的队列
public Queue phoneQueue(){
return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
@Bean // 交换机
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
RabbitMqConstants.durable,
RabbitMqConstants.autoDelete);
}
@Bean
public Binding mailBinding(){
return BindingBuilder.bind(mailQueue())
.to(fanoutExchange());
}
@Bean
public Binding phoneBinding(){
return BindingBuilder.bind(phoneQueue())
.to(fanoutExchange());
}
@Bean // A队列
public Queue topicAQueue(){
return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_A,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
/**
* topic模式相关配置
*/
@Bean // B队列
public Queue topicBQueue(){
return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_B,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
@Bean // topic的交换机
public TopicExchange topicMyExchange(){
return new TopicExchange(RabbitMqConstants.MQ_TOPIC_EXCHANGE,
RabbitMqConstants.durable,
RabbitMqConstants.autoDelete);
}
@Bean
public Binding topicAQueueBinding(){
return BindingBuilder
.bind(topicAQueue())
.to(topicMyExchange())
.with("topic.xxx"); // 规则 topic.xxx
}
@Bean
public Binding topicBQueueBinding(){
return BindingBuilder
.bind(topicBQueue())
.to(topicMyExchange())
.with("topic.*"); // 规则 topic.xxx
}
}
生产者发送信息
/**
* topic模式下,生产者发送信息给交换机,可以决定给哪个队列发信息
* @param msg 发送的信息
* @param routingKey 类似正则表达式,决定给谁发
* .with("topic.xxx"); // 规则 topic.xxx ---- A队列
* .with("topic.*"); // 规则 topic.xxx ---- B队列
* 在配置类中,如上所述配置,则如果输入的routingKey为 topic.xxx则给A和B发;
* 如果输入的routingKey为 topic.yyy 则 只给B队列发;
*/
void sendMsg(String msg,String routingKey);
实现
package com.tianju.auth.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tianju.auth.entity.UserPrivs;
import com.tianju.auth.mapper.UserMapper;
import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class UserServiceImpl implements IUserService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendCode(String msg) {
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_FANOUT_EXCHANGE,
"routingkey.fanout",
msg);
log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
}
@Override
public void sendMsg(String msg,String routingKey) {
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_TOPIC_EXCHANGE,
routingKey, // "topic.yyy",此时只有B队列有信息
msg);
log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
}
}
进行发送
package com.tianju.auth.service.impl;
import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class UserServiceImplTest {
@Autowired
private IUserService userService;
@Test
public void sendCode() {
String code = new Snowflake().nextIdStr().substring(0, 6);
System.out.println(code);
userService.sendCode(code);
}
@Test
public void sendTopic() {
String code = new Snowflake().nextIdStr().substring(0, 6);
System.out.println(code);
userService.sendMsg(code,"topic.yyy");
}
}
控制台查看
rabbitmq回调确认
配置类
spring:
# rabbitmq的配置
rabbitmq:
host: 192.168.111.130
port: 5672
username: admin
password: 123
# 确认收到
publisher-confirm-type: correlated
publisher-returns: true
验证生产者发送是否成功
使用RabbitTemplate的回调方法。
先设置
- setConfirmCallback
- setReturnsCallback
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendCode(String msg) {
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_FANOUT_EXCHANGE,
"routingkey.fanout",
msg);
log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
}
@Override
public void sendMsg(String msg,String routingKey) {
// 如果发到交换机,看一下有没有反馈
rabbitTemplate.setConfirmCallback((c,ack,message)->{
log.debug("***** setConfirmCallback:ack--{}", ack); // 是否发送到交换机
log.debug("***** setConfirmCallback:c-->{}",c);
// channel error; protocol method: #method<channel.close>(reply-code=404,
// reply-text=NOT_FOUND - no exchange 'aaaa' in vhost '/', class-id=60, method-id=40)
log.debug("***** setConfirmCallback:m-->{}",message);
if (ack){
log.debug("[生产者:] 发送信息到交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
}else {
log.debug(message);
}
});
rabbitTemplate.setReturnsCallback(r->{
log.debug("返回文字{}", r.getReplyText());
log.debug("返回code{}", r.getReplyCode());
log.debug("返回Exchange{}", r.getExchange());
log.debug("返回RoutingKey{}", r.getRoutingKey());
});
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_TOPIC_EXCHANGE,
// "aaaa",// 失败的情况
routingKey, // "topic.yyy",此时只有B队列有信息
msg);
log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
}
rabbitTemplate.setConfirmCallback((c,ack,message)->{
log.debug("******* setConfirmCallback:ack->{}",ack);
log.debug("******* setConfirmCallback:c->{}",c);
log.debug("******* setConfirmCallback:chanel->{}",message);
if(ack){
log.debug("[生产者]发送信息到达交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
}else {
log.debug(message);
}
});
rabbitTemplate.setReturnsCallback(r->{
log.debug("返回文字:{}",r.getReplyText());
log.debug("返回code:{}",r.getReplyCode());
log.debug("返回Exchange:{}",r.getExchange());
log.debug("返回RoutingKey:{}",r.getRoutingKey());
});
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_TOPIC_EXCHANGE,
"abc.xxx",
msg
);
@Test
public void sendTopic() {
String code = new Snowflake().nextIdStr().substring(0, 6);
System.out.println(code);
userService.sendMsg(code,"topic.rrr");
}
延迟队列(死信)设计
Documentation: Table of Contents — RabbitMQ
java代码步骤
创建正常+死信队列
package com.tianju.mq.config;
import com.tianju.mq.constants.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
@Bean
public DirectExchange normalExchange(){
return new DirectExchange(RabbitMqConstants.MQ_NORMAL_EXCHANGE,
RabbitMqConstants.durable,
RabbitMqConstants.autoDelete);
}
@Bean
public Queue normalQueue(){
Map<String, Object> map = new HashMap<>(2);
map.put("x-dead-letter-exchange",RabbitMqConstants.MQ_DELAY_EXCHANGE);
map.put("x-dead-letter-routing-key",RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
return new Queue(
RabbitMqConstants.MQ_NORMAL_QUEUE,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete,
map);
}
@Bean
public Binding normalBinding(){
return BindingBuilder.bind(normalQueue())
.to(normalExchange())
.with(RabbitMqConstants.MQ_NORMAL_ROUTING_KEY);
}
//------------------死信队列设计--------------------------
/**
* 死信(延迟)队列
* @return
*/
@Bean
public Queue delayQueue(){
return new Queue(RabbitMqConstants.MQ_DELAY_QUEUE,
RabbitMqConstants.durable,
RabbitMqConstants.exclusive,
RabbitMqConstants.autoDelete);
}
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange delayExchange(){
return new DirectExchange(RabbitMqConstants.MQ_DELAY_EXCHANGE,
RabbitMqConstants.durable,
RabbitMqConstants.autoDelete);
}
/**
* 死信交换机队列绑定
* @return
*/
@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with(RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
}
}
配置类+常量
package com.tianju.mq.constants;
public interface RabbitMqConstants {
String MQ_DELAY_QUEUE = "mq_delay_queue"; // 延迟队列,死信队列
String MQ_DELAY_EXCHANGE = "mq_delay_exchange"; // 死信交换机
String MQ_DELAY_ROUTING_KEY = "mq_delay_routing_key"; // 死信路由
// 正常的队列,交换机,路由
String MQ_NORMAL_QUEUE = "mq_normal_queue";
String MQ_NORMAL_EXCHANGE = "mq_normal_exchange";
String MQ_NORMAL_ROUTING_KEY = "mq_normal_routing_key";
// 参数
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
}
server:
port: 9099
spring:
# 邮箱的配置
mail:
host: smtp.qq.com
port: 587
username: xxxxx.com
password: xxxxx
# rabbitmq的配置
rabbitmq:
host: 192.168.111.130
port: 5672
username: admin
password: 123
# 确认收到
publisher-confirm-type: correlated
publisher-returns: true
logging:
level:
com.tianju.mq: debug
生产者到正常队列
package com.tianju.mq.service;
public interface IUserService {
/**
* 延迟队列的生产者
* @param msg 发送的信息
* @param delayTime 延迟的时间,毫秒
*/
void sendDelay(String msg,int delayTime);
}
package com.tianju.mq.service.impl;
import com.tianju.mq.constants.RabbitMqConstants;
import com.tianju.mq.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class UserServiceImpl implements IUserService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendDelay(String msg, int delayTime) {
rabbitTemplate.convertAndSend(
RabbitMqConstants.MQ_NORMAL_EXCHANGE,
RabbitMqConstants.MQ_NORMAL_ROUTING_KEY,
msg,
process->{
process.getMessageProperties().setExpiration(String.valueOf(delayTime));
return process;
}
);
log.debug("[生产者:]发送消息:{},时间{},延迟{}秒",msg,new Date(),delayTime/1000);
}
}
消费者进行延迟消费
package com.tianju.mq.consumer;
import com.tianju.mq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class UserConsumer {
@RabbitListener(queues = RabbitMqConstants.MQ_DELAY_QUEUE)
public void delayConsume(String msg){
log.debug("[消费者消费信息:{},时间:{}",msg,new Date());
}
}
延迟队列插件安装
访问官网
Community Plugins — RabbitMQ
进入rabbitmq docker容器
[root@localhost ~]# docker exec -it rabbitmq bash
查询插件列表是否存在延迟插件
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@6d2342d51b11
|/
[ ] rabbitmq_amqp1_0 3.9.11
[ ] rabbitmq_auth_backend_cache 3.9.11
[ ] rabbitmq_auth_backend_http 3.9.11
[ ] rabbitmq_auth_backend_ldap 3.9.11
[ ] rabbitmq_auth_backend_oauth2 3.9.11
[ ] rabbitmq_auth_mechanism_ssl 3.9.11
[ ] rabbitmq_consistent_hash_exchange 3.9.11
[ ] rabbitmq_event_exchange 3.9.11
[ ] rabbitmq_federation 3.9.11
[ ] rabbitmq_federation_management 3.9.11
[ ] rabbitmq_jms_topic_exchange 3.9.11
[E*] rabbitmq_management 3.9.11
[e*] rabbitmq_management_agent 3.9.11
[ ] rabbitmq_mqtt 3.9.11
[ ] rabbitmq_peer_discovery_aws 3.9.11
[ ] rabbitmq_peer_discovery_common 3.9.11
[ ] rabbitmq_peer_discovery_consul 3.9.11
[ ] rabbitmq_peer_discovery_etcd 3.9.11
[ ] rabbitmq_peer_discovery_k8s 3.9.11
[E*] rabbitmq_prometheus 3.9.11
[ ] rabbitmq_random_exchange 3.9.11
[ ] rabbitmq_recent_history_exchange 3.9.11
[ ] rabbitmq_sharding 3.9.11
[ ] rabbitmq_shovel 3.9.11
[ ] rabbitmq_shovel_management 3.9.11
[ ] rabbitmq_stomp 3.9.11
[ ] rabbitmq_stream 3.9.11
[ ] rabbitmq_stream_management 3.9.11
[ ] rabbitmq_top 3.9.11
[ ] rabbitmq_tracing 3.9.11
[ ] rabbitmq_trust_store 3.9.11
[e*] rabbitmq_web_dispatch 3.9.11
[ ] rabbitmq_web_mqtt 3.9.11
[ ] rabbitmq_web_mqtt_examples 3.9.11
[ ] rabbitmq_web_stomp 3.9.11
[ ] rabbitmq_web_stomp_examples 3.9.11
下载支持3.9.x的插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?after=rabbitmq_v3_6_12
退出容器:
root@6d2342d51b11:/plugins# exit
exit
上传到linux服务器
在/usr/local/software/下创建文件夹rabbitmq/plugins
[root@localhost software]# mkdir -p rabbitmq/plugins
拷贝插件到容器中
[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
进入容器安装插件
[root@localhost plugins]# docker exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
打开管理页面
进入Exchange页面,下拉Type看是否已经安装成功。
文章来源:https://www.toymoban.com/news/detail-686853.html
总结
1.rabbitmq队列方式的梳理,点对点,一对多;
2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;
3.topic模式,根据规则决定发送给哪个队列;
4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;
5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;
到了这里,关于RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!