消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

这篇具有很好参考价值的文章主要介绍了消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0.交换机种类和区别

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.声明交换机

4.监听队列

4.1 监听普通队列

4.2监听死信队列

 5.削峰填谷的实现


0.交换机种类和区别

  1. Direct Exchange(直连交换机)

    • 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。
    • 当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。
  2. Fanout Exchange(扇出交换机)

    • 扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。
    • 当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。
  3. Topic Exchange(主题交换机)

    • 主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。
    • 绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。
  4. Headers Exchange(标头交换机)

    • 标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。
    • 在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。

如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息

比如此时交换机有20条信息,a,b队列都会分配到20条信息

默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置

1.声明队列和交换机以及RountKey

package com.example.config;


import lombok.Getter;

@Getter
public enum RabbitmqBind {


    DATA_CLEAN_PROCESS_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
            false,
            false,
            null,
            null
    ),

    DATA_CLEAN_PROCESS(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),

    SMS_CLEAN_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
            true,
            false,
            null,
            null
    ),

    SMS_CLEAN(
            RabbitMqExchangeEnum.E_TOPIC_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
            RabbitmqRoutingKey.K_API_TO_DCN_SMS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
    ),


    ;

    /**
     * 交换机
     */
    private RabbitMqExchangeEnum exchange;

    /**
     * 队列名称
     */
    private String queueName;

    /**
     * 路由Key
     */
    private RabbitmqRoutingKey routingKey;

    /**
     * 绑定标识
     * 是否启用
     */
    private Boolean isBind;

    /**
     * 是否绑定死信
     */
    private Boolean isDeathBelief;

    /**
     * 绑定的死信交换机
     */
    private RabbitMqExchangeEnum boundDeadExchange;

    /**
     * 死信key
     */
    private RabbitmqRoutingKey deadRoutingKey;


    RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
                 Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
    ) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.isBind = isBind;
        this.isDeathBelief = isDeathBelief;
        this.boundDeadExchange = boundDeadExchange;
        this.deadRoutingKey = deadRoutingKey;
    }

    /**
     * 交换机
     */
    @Getter
    public enum RabbitMqExchangeEnum {

        /**
         * 交换机定义,类型 - 名称
         */
        E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
        DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),

        E_TOPIC_RCP("topic", "E_TOPIC_RCP"),

        E_TOPIC_PAY("topic", "E_TOPIC_PAY");

        private String exchangeType;

        private String exchangeName;

        RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
            this.exchangeType = exchangeType;
            this.exchangeName = exchangeName;
        }
    }

    /**
     * 队列名定义
     */
    public interface RabbitMqQueueConstants {

        /**
         * 接收清洗数据
         */
        String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";

        /**
         * 清洗结束通知
         */
        String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

        /**
         * 死信队列
         */
        String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";

        /**
         * 清洗结束通知死信队列
         */
        String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
    }

    /**
     * routingKey
     */
    @Getter
    public enum RabbitmqRoutingKey {

        /**
         * 路由
         */
        K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
        K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),

        // 路由绑定死信路由
        DEAD("DEAD"),

        //死信路由
        K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
        K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
        ;

        private String keyName;

        RabbitmqRoutingKey(String keyName) {
            this.keyName = keyName;
        }
    }

}

2.初始化循环绑定

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;

@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class MqConfig {
    @Resource
    protected RabbitTemplate rabbitTemplate;
    @Resource
    ConnectionFactory connectionFactory;
//
//    @Lazy
//    @Autowired
//    protected RabbitAdmin rabbitAdmin;
//
//
//    public static final int DEFAULT_CONCURRENT = 10;
//
//    @Bean("customContainerFactory")
//    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                 ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
//        factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
//        return new RabbitTransactionManager(connectionFactory);
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
//        return new RabbitAdmin(connectionFactory);
//    }

    @PostConstruct
    protected void init() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);


        rabbitTemplate.setChannelTransacted(true);
        //创建exchange
        Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
                .forEach(rabbitMqExchangeEnum -> {
                            Exchange exchange = RabbitmqExchange
                                    .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
                                    .createExchange(rabbitMqExchangeEnum.getExchangeName());
                            rabbitAdmin.declareExchange(exchange);
                        }
                );

        //创建队列并绑定exchange
        Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
            if (RabbitmqBind.getIsBind()) {
                if (RabbitmqBind.getIsDeathBelief()) {
                    //需要绑定死信交换机的队列
                    rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())
                            .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())
                            .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                } else {
                    //不需要绑定死信交换机的队列
                    rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
                            true, false, false, null));
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                }
            }
        });
    }

}

 绑定的形式由枚举类中定义

3.声明交换机

package com.example.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;

import java.util.Arrays;


@Getter
@Slf4j
public enum RabbitmqExchange {

    DIRECT("direct"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new DirectExchange(exchangeName, true, false);
        }
    },

    TOPIC("topic"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new TopicExchange(exchangeName, true, false);
        }
    };

    public static RabbitmqExchange getInstanceByType(String type){

        return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
                .findAny()
                .orElseThrow(() ->
//                        new ProcessException("无效的exchange type")

                        new RuntimeException("无效的exchange type")
                );
    }

    private String type;


    RabbitmqExchange(String type) {
        this.type = type;
    }

    public abstract Exchange createExchange(String exchangeName);

}

4.监听队列

4.1 监听普通队列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")
//, containerFactory = "customContainerFactory"
public class MqListener {



    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }

    /**
     * 处理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("process message is blank , message:{}" , message);
            return;
        }
    }

}

 监听并处理任务

4.2监听死信队列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
public class DeadListener {

    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }


    /**
     * 处理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("Dead process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("Dead process message is blank , message:{}" , message);
            return;
        }
    }

}

 5.削峰填谷的实现

把高峰期的消息填进低峰期

消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明,中间件,消息队列,开发小技巧,中间件,rabbitmq,分布式

可以用拉取的方式来实现

或者用消费者的最大数量和最小数量来实现文章来源地址https://www.toymoban.com/news/detail-856444.html

channel.basicQos();//设置最大获取未确认消息的数量,实现权重

到了这里,关于消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入详解高性能消息队列中间件 RabbitMQ

      目录 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 优势 4、RabbitMQ 整体架构剖析 4.1、发送消息流程 4.2、消费消息流程 5、RabbitMQ 应用 5.1、广播 5.2、RPC VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++软件异常排查从入

    2024年02月05日
    浏览(72)
  • 消息队列中间件 - Docker安装RabbitMQ、AMQP协议、和主要角色

    不管是微服务还是分布式的系统架构中,消息队列中间件都是不可缺少的一个重要环节,主流的消息队列中间件有RabbitMQ、RocketMQ等等,从这篇开始详细介绍以RabbitMQ为代表的消息队列中间件。 AMQP协议 AMQP协议是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与

    2024年02月03日
    浏览(57)
  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(58)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(56)
  • 消息队列(中间件)

    通信协议: 为了实现客户端和服务器之间的通信来完成的逻辑,基于TCP实现的自定义应用层协议。通过这个协议,完成客户端–服务器远程方法调用。 序列化/反序列化: 通过网络传输对象把对象存储到硬盘上。 序列化:把对象转化为二进制的数据序列,反序列化:把二进制数

    2024年02月07日
    浏览(50)
  • 消息队列中间件(一)

    流量削峰 应用解耦 异步处理 ActiveMQ 优:单机吞吐万级,时效性ms级,可用性高(主从架构),可靠性高(丢失率低) 缺:官方维护少,高吞吐场景较少使用 Kafka 大数据 - 数据采集,传输,存储 优:高吞吐量(百万级),时效性ms级,可用性高,日志成熟 缺:短轮询,失败

    2024年02月11日
    浏览(51)
  • 消息队列中间件介绍

    消息队列介绍   消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。 目前常见的消息中间件有ActiveMQ、Rabbi

    2024年02月04日
    浏览(49)
  • RabbitMQ消息中间件

    RabbitMQ消息中间件 RabbitMQ简介 windows下安装RabbitMQ RabbitMQ基本概念 RabbitMQ简单模式 RabbitMQ工作队列模式 RabbitMQ发布订阅模式 RabbitMQ路由模式 RabbitMQ主题模式 RabbitMQ RPC模式 RabbitMQ发布确认模式

    2024年02月10日
    浏览(57)
  • 消息中间件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年01月17日
    浏览(80)
  • 消息中间件RabbitMQ详解

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件机制的系统中

    2024年02月16日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包