RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

这篇具有很好参考价值的文章主要介绍了RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

引出


1.rabbitmq队列方式的梳理,点对点,一对多;
2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;
3.topic模式,根据规则决定发送给哪个队列;
4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;
5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;文章来源地址https://www.toymoban.com/news/detail-686853.html

点对点(simple)

点对对方式传输

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

Work queues 一对多

1个生产者多个消费者

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

发布订阅/fanout模式

生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到。

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

以登陆验证码为例

pom文件导包

<!--        qq邮箱-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <!--        阿里云短信验证码相关包-->
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.3</version>
        </dependency>

<!--        queue的包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

application.yml文件

server:
  port: 9099

spring:
  # 模块的名字
  application:
    name: user-auth


  # 邮箱的配置
  mail:
    host: smtp.qq.com
    port: 587
    username: xxxx
    password: xxxxx

  # rabbitmq的配置
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123

logging:
  level:
    com.tianju.auth: debug

rabbitmq的配置

需要用到的常量

package com.tianju.auth.util;

/**
 * rabbitmq的常量
 */
public interface RabbitMqConstants {
    String MQ_MAIL_QUEUE="mq_email_queue";
    String MQ_PHONE_QUEUE="mq_phone_queue";
    String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";

    // 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;

}

RabbitMqConfig.java配置

邮箱队列,电话队列,交换机;

邮箱绑定交换机,电话绑定交换机;

创建队列参数说明:

参数 说明
name 字符串值,queue的名称。
durable 布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。
exclusive 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。
autoDelete 布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。

不指定 durable、exclusive 和 autoDelete 时,默认为 truefalsefalse 。表示持久化、非排它、不用自动删除。

创建交换机参数说明

参数 说明
name 字符串值,exchange 的名称。
durable 布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。
autoDelete 布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。

不指定 durable 和 autoDelete 时,默认为 truefalse 。表示持久化、不用自动删除

package com.tianju.auth.config;

import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Bean // 邮箱的队列
    public Queue mailQueue(){
        return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    @Bean // 电话的队列
    public Queue phoneQueue(){
        return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }
    @Bean // 交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }

    @Bean
    public Binding mailBinding(){
        return BindingBuilder.bind(mailQueue())
                .to(fanoutExchange());
    }

    @Bean
    public Binding phoneBinding(){
        return BindingBuilder.bind(phoneQueue())
                .to(fanoutExchange());
    }

}

生产者生成验证码,发送给交换机

接口

package com.tianju.auth.service;

public interface IUserService {

    /**
     * 生产者生成信息发送给交换机
     * @param msg 信息,这里是验证码
     */
    void sendCode(String msg);
}

实现

package com.tianju.auth.service.impl;


import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class UserServiceImpl implements IUserService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendCode(String msg) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                "routingkey.fanout",
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

}

测试类生成验证码,发给交换机

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

package com.tianju.auth.service.impl;

import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;



@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)


public class UserServiceImplTest {


    @Autowired
    private IUserService userService;

    @Test
    public void sendCode() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendCode(code);
    }
}

消费者消费验证码

package com.tianju.auth.consumer;


import com.tianju.auth.service.IEmailService;
import com.tianju.auth.util.RabbitMqConstants;
import com.tianju.auth.util.SMSUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class UserConsumer {
    @Autowired
    private IEmailService emailService;

    @RabbitListener(queues = RabbitMqConstants.MQ_MAIL_QUEUE)
    public void emailConsumer(String msg){
        log.debug("[email消费者:]消费{}",msg);
        emailService.sendEmail("xxxx@qq.com", "登陆验证码", msg);
    }

    @RabbitListener(queues = RabbitMqConstants.MQ_PHONE_QUEUE)
    public void phoneConsumer(String msg){
        log.debug("[phone消费者:]消费{}",msg);
        SMSUtil.send("xxxx", msg);
    }

}

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

topic模式

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

例如: routingkey: my.orange.rabbit —-> Q1,Q2

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

配置类增加配置

package com.tianju.auth.util;

/**
 * rabbitmq的常量
 */
public interface RabbitMqConstants {
    String MQ_MAIL_QUEUE="mq_email_queue";
    String MQ_PHONE_QUEUE="mq_phone_queue";
    String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";
    
    String MQ_TOPIC_EXCHANGE="mq_topic_exchange";

    String MQ_TOPIC_QUEUE_A = "mq_topic_queue_a";
    String MQ_TOPIC_QUEUE_B = "mq_topic_queue_b";

    // 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;

}

package com.tianju.auth.config;

import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Bean // 邮箱的队列
    public Queue mailQueue(){
        return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    @Bean // 电话的队列
    public Queue phoneQueue(){
        return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }
    @Bean // 交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }

    @Bean
    public Binding mailBinding(){
        return BindingBuilder.bind(mailQueue())
                .to(fanoutExchange());
    }

    @Bean
    public Binding phoneBinding(){
        return BindingBuilder.bind(phoneQueue())
                .to(fanoutExchange());
    }



    @Bean // A队列
    public Queue topicAQueue(){
        return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_A,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    /**
     * topic模式相关配置
     */

    @Bean // B队列
    public Queue topicBQueue(){
        return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_B,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    @Bean // topic的交换机
    public TopicExchange topicMyExchange(){
        return new TopicExchange(RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }


    @Bean
    public Binding topicAQueueBinding(){
        return BindingBuilder
                .bind(topicAQueue())
                .to(topicMyExchange())
                .with("topic.xxx"); // 规则 topic.xxx
    }

    @Bean
    public Binding topicBQueueBinding(){
        return BindingBuilder
                .bind(topicBQueue())
                .to(topicMyExchange())
                .with("topic.*"); // 规则 topic.xxx
    }

}

生产者发送信息

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

    /**
     * topic模式下,生产者发送信息给交换机,可以决定给哪个队列发信息
     * @param msg 发送的信息
     * @param routingKey 类似正则表达式,决定给谁发
     *                   .with("topic.xxx"); // 规则 topic.xxx ---- A队列
     *                   .with("topic.*"); // 规则 topic.xxx   ---- B队列
     *                   在配置类中,如上所述配置,则如果输入的routingKey为 topic.xxx则给A和B发;
     *                                      如果输入的routingKey为 topic.yyy 则 只给B队列发;
     */
    void sendMsg(String msg,String routingKey);

实现

package com.tianju.auth.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tianju.auth.entity.UserPrivs;
import com.tianju.auth.mapper.UserMapper;
import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class UserServiceImpl implements IUserService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendCode(String msg) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                "routingkey.fanout",
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

    @Override
    public void sendMsg(String msg,String routingKey) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                routingKey, // "topic.yyy",此时只有B队列有信息
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }


}

进行发送

package com.tianju.auth.service.impl;

import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;



@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)


public class UserServiceImplTest {

    @Autowired
    private IUserService userService;

    @Test
    public void sendCode() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendCode(code);
    }

    @Test
    public void sendTopic() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendMsg(code,"topic.yyy");
    }
}

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

控制台查看

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

rabbitmq回调确认

配置类

spring:
  # rabbitmq的配置
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123
    # 确认收到
    publisher-confirm-type: correlated
    publisher-returns: true

验证生产者发送是否成功

使用RabbitTemplate的回调方法。

先设置

  • setConfirmCallback
  • setReturnsCallback

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendCode(String msg) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                "routingkey.fanout",
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

    @Override
    public void sendMsg(String msg,String routingKey) {

        // 如果发到交换机,看一下有没有反馈
        rabbitTemplate.setConfirmCallback((c,ack,message)->{
            log.debug("***** setConfirmCallback:ack--{}", ack); // 是否发送到交换机
            log.debug("***** setConfirmCallback:c-->{}",c);
            // channel error; protocol method: #method<channel.close>(reply-code=404,
            // reply-text=NOT_FOUND - no exchange 'aaaa' in vhost '/', class-id=60, method-id=40)
            log.debug("***** setConfirmCallback:m-->{}",message);
            if (ack){
                log.debug("[生产者:] 发送信息到交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
            }else {
                log.debug(message);
            }
        });

        rabbitTemplate.setReturnsCallback(r->{
            log.debug("返回文字{}", r.getReplyText());
            log.debug("返回code{}", r.getReplyCode());
            log.debug("返回Exchange{}", r.getExchange());
            log.debug("返回RoutingKey{}", r.getRoutingKey());
        });


        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_TOPIC_EXCHANGE,
//                "aaaa",// 失败的情况
                routingKey, // "topic.yyy",此时只有B队列有信息
                msg);

        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

rabbitTemplate.setConfirmCallback((c,ack,message)->{
    log.debug("******* setConfirmCallback:ack->{}",ack);
    log.debug("******* setConfirmCallback:c->{}",c);
    log.debug("******* setConfirmCallback:chanel->{}",message);
    if(ack){
        log.debug("[生产者]发送信息到达交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
    }else {
        log.debug(message);
    }
});
rabbitTemplate.setReturnsCallback(r->{
    log.debug("返回文字:{}",r.getReplyText());
    log.debug("返回code:{}",r.getReplyCode());
    log.debug("返回Exchange:{}",r.getExchange());
    log.debug("返回RoutingKey:{}",r.getRoutingKey());
});
rabbitTemplate.convertAndSend(
        RabbitMqConstants.MQ_TOPIC_EXCHANGE,
        "abc.xxx",
        msg
);
    @Test
    public void sendTopic() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendMsg(code,"topic.rrr");
    }

延迟队列(死信)设计

Documentation: Table of Contents — RabbitMQ

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

java代码步骤

创建正常+死信队列

package com.tianju.mq.config;

import com.tianju.mq.constants.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfig {

    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(RabbitMqConstants.MQ_NORMAL_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }
    @Bean
    public Queue normalQueue(){
        Map<String, Object> map = new HashMap<>(2);
        map.put("x-dead-letter-exchange",RabbitMqConstants.MQ_DELAY_EXCHANGE);
        map.put("x-dead-letter-routing-key",RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
        return new Queue(
                RabbitMqConstants.MQ_NORMAL_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete,
                map);
    }
    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with(RabbitMqConstants.MQ_NORMAL_ROUTING_KEY);
    }


    //------------------死信队列设计--------------------------
    /**
     * 死信(延迟)队列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue(RabbitMqConstants.MQ_DELAY_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }
    /**
     * 死信交换机
     * @return
     */
    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange(RabbitMqConstants.MQ_DELAY_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }
    /**
     * 死信交换机队列绑定
     * @return
     */
    @Bean
    public Binding delayBinding(){
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with(RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
    }
}

配置类+常量

package com.tianju.mq.constants;

public interface RabbitMqConstants {
    String MQ_DELAY_QUEUE = "mq_delay_queue"; // 延迟队列,死信队列
    String MQ_DELAY_EXCHANGE = "mq_delay_exchange"; // 死信交换机
    String MQ_DELAY_ROUTING_KEY = "mq_delay_routing_key"; // 死信路由

    // 正常的队列,交换机,路由
    String MQ_NORMAL_QUEUE = "mq_normal_queue";
    String MQ_NORMAL_EXCHANGE = "mq_normal_exchange";
    String MQ_NORMAL_ROUTING_KEY = "mq_normal_routing_key";

    // 参数
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;
}

server:
  port: 9099

spring:

  # 邮箱的配置
  mail:
    host: smtp.qq.com
    port: 587
    username: xxxxx.com
    password: xxxxx

  # rabbitmq的配置
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123
    # 确认收到
    publisher-confirm-type: correlated
    publisher-returns: true


logging:
  level:
    com.tianju.mq: debug

生产者到正常队列

package com.tianju.mq.service;

public interface IUserService {
    /**
     * 延迟队列的生产者
     * @param msg 发送的信息
     * @param delayTime 延迟的时间,毫秒
     */
    void sendDelay(String msg,int delayTime);
}

package com.tianju.mq.service.impl;

import com.tianju.mq.constants.RabbitMqConstants;
import com.tianju.mq.service.IUserService;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


import java.util.Date;

@Service
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendDelay(String msg, int delayTime) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_NORMAL_EXCHANGE,
                RabbitMqConstants.MQ_NORMAL_ROUTING_KEY,
                msg,
                process->{
                    process.getMessageProperties().setExpiration(String.valueOf(delayTime));
                    return process;
                }
        );
        log.debug("[生产者:]发送消息:{},时间{},延迟{}秒",msg,new Date(),delayTime/1000);
    }
}

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

消费者进行延迟消费

package com.tianju.mq.consumer;

import com.tianju.mq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class UserConsumer {

    @RabbitListener(queues = RabbitMqConstants.MQ_DELAY_QUEUE)
    public void delayConsume(String msg){
        log.debug("[消费者消费信息:{},时间:{}",msg,new Date());
    }
}

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@6d2342d51b11
 |/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11

下载支持3.9.x的插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?after=rabbitmq_v3_6_12

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

退出容器:

root@6d2342d51b11:/plugins# exit
exit

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot

打开管理页面

进入Exchange页面,下拉Type看是否已经安装成功。

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计,SpringBoot,rabbitmq,学习,ruby,spring boot


总结

1.rabbitmq队列方式的梳理,点对点,一对多;
2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;
3.topic模式,根据规则决定发送给哪个队列;
4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;
5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;

到了这里,关于RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ详解(三):消息模式(fanout、direct、topic、work)

    参考官网:https://www.rabbitmq.com/getstarted.html 简单模式 Simple, 参考RabbitMQ详解(二):消息模式 Simple(简单)模式 简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。 发布订阅模式 fanout 同时向

    2024年02月10日
    浏览(47)
  • 【RabbitMQ】golang客户端教程3——发布订阅(使用fanout交换器)

    在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务只传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一个消息。这就是所谓的 “订阅/发布模式” 。 为了说明这种模式,我们将构建一个简单的日志系统。

    2024年02月14日
    浏览(41)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(61)
  • RabbitMQ---订阅模型-Fanout

    Fanout,也称为广播。 流程图: 在广播模式下,消息发送流程是这样的: 1) 可以有多个消费者 2) 每个消费者有自己的queue(队列) 3) 每个队列都要绑定到Exchange(交换机) 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。 5)

    2024年02月11日
    浏览(49)
  • 【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)

    通过本篇博客能够简单使用RabbitMQ的发布订阅模式。 本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。 发布订阅模式的核心是生产者生产的消息,其他消费者都可以收到该生产者生产的消息。 由于发布订阅模式

    2024年02月02日
    浏览(34)
  • RabbitMQ发布与订阅模式类型

    🍁博客主页:👉不会压弯的小飞侠 ✨欢迎关注:👉点赞👍收藏⭐留言✒ ✨系列专栏:👉Linux专栏 🔥欢迎大佬指正,一起学习!一起加油! 工作队列背后的假设是每个任务都是 只交付给一名工人。在这一部分中,我们将做一些事情 完全不同的 - 我们将向多个传递消息 消

    2024年02月02日
    浏览(42)
  • springboot rabbitmq 发布订阅 广播模式

    根据amqp协议、rabbitmq入门、springboot集成rabbitmq 可知,rabbitmq的广播模式关键是使用fanout类型的exchange,fanout exchange会忽略message中的routing-key、queue中的binding-key,发给绑定exchange的全部queue。 实现发布订阅(广播模式)的关键在于对exchange类型的理解,可参考amqp协议、rabbitmq入门

    2024年02月02日
    浏览(39)
  • RabbitMQ入门案例之发布订阅模式

    本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。 官网文档地址:https://rabbitmq.com/getstarted.html RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。 在该模

    2024年02月09日
    浏览(56)
  • RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析

    RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用,并给出怎么用的案例。 本篇博客结合场景来阐述RabbitMQ的几种模

    2024年02月07日
    浏览(44)
  • RabbitMQ的Publish/Subscribe发布订阅模式详解

    各位小伙伴很久不见了,今儿又要给大家分享干货了。我们知道RabbitMQ有简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、远程过程调用模式、发布者确认模式等。这么多模式,你可能一下子很难全部吸收,今天袁老师主要给大家介绍发布订阅模式Publish/Subsc

    2024年02月10日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包