2.rabbitMQ之交换机

这篇具有很好参考价值的文章主要介绍了2.rabbitMQ之交换机。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.交换机的作用

1.默认交换机会自动指定队列
2.之前一个信息必须被消费1次,现在的是一个消息可以被消费多次(发送到不同队列的前提下,正常情况下一个队列只能消费一次)
3.消息先发给交换机,然后交换机发给多个队列,可以达到多次消费的效果

如图mq3
2.rabbitMQ之交换机

2.交换机的类型

  1. 默认交换机 无名 ""指定
    1.直接类型 direct(一个routingKey绑定一个队列,一个交换机绑定一个队列)
    2.主题 topic(一个交换机绑定多个队列,主要是通过表达式来实现)
    3.标题 headers(不常用)
    4.扇出 fanout(一个信息被交换机全部队列接收,相当于QQ聊天)

3.临时队列 没有D的队列 一旦断开连接,队列会被自动删除

   //获得临时队列,features有 AD和 Excl
    String QName=channel.quequeDeclare().getQueue();

4.交换机和队列绑定binding

   //在界面, 添加queue然后添加exchange,然后在交换机 添加队列,
       rountingkey代表想要发给哪个队列,后面可以指定哪个可以接收特定的信息

5.fanout(相当于qq群的广播,一条消息被多台计算机 接收) ,队列名可以写为空
1. 2个消费者 声明交换机的名字和类型 主要代码如下

  channel.exchangeDeclare(name,"fanout");
     String QName=channel.queueDeclare().getQueue();
      //队列名,交换机名,routingKey
      channel.queueBind(QName,exName,"");
             -------完整代码-------
public class exchangeConsumer1 {
    public static final String EXCHANGE_NAME="log";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //得到临时队列
        String QName=channel.queueDeclare().getQueue();
        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.queueBind(QName,EXCHANGE_NAME,"");
        DeliverCallback deliverCallback=(tag, delivery)->{

            System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
        };
        CancelCallback nCallback=(tag)->{

            System.out.println("失败应答");
        };

        boolean IsAck=false;
        channel.basicConsume(QName,IsAck,deliverCallback,(tag)->{});



    }
}
   2.生产者 也要不用再次声明交换机,不用队列名了,用交换机名就可以接收消息
 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
           -------完整代码-------
public class exchangeProducer {
    public static final String EXCHANGE_NAME="log";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //交换机的名字和类型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
		//模拟生产者不停发消息
        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//            boolean flag=channel.waitForConfirms();
//            if(flag){
//                System.out.println("消息已经写入磁盘的确认");
//
//            }

        }

    }
}

6.直接交换机 direct(rountingKey相等就是fanout交换机,不相等就是direct)(路由)
//可以绑定队列
1.提供者修改交换机类型和routingKey

      channel.exchangeDeclare(EXCHANGE_NAME,"direct");
      channel.basicPublish(EXCHANGE_NAME,"wrong", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
2.消费者修改交换机类型和routingKey
 channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(QName,EXCHANGE_NAME,"error");

7.topic主题交换机,direct只能一个交换机绑定一个队列,这个可以路由多个队列(生产者不声明队列,只声明交换机,消费者声明队列和交换机)
1.routingKey的写法单词不能全部是字母(就成direct交换机了) 如aa.bb.mq *代替1个单词 #代替0个或多个单词

如 lazy.#可以匹配 lazy.ngs.me
*.*.rabbit 匹配 ngs.me.rabbit

2.注意 绑定是 #相当于fanout交换机   #和*都没有出现就是direct交换机
3.代码 交换机为topic

#消费者.queueBind()可以写多次

     -------代码------
       public class TopicProducer {
    public static final String EXCHANGE_NAME="log2";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        channel.confirmSelect();
        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//        channel.queueDeclare("Q1",false,false,false,null);
        Scanner scanner = new Scanner(System.in);



        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            //fanout交换机模式
//            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
            //direct交换机,这里发送的routingKey可以自行修改测试
			            channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//            boolean flag=channel.waitForConfirms();
//            if(flag){
//                System.out.println("消息已经写入磁盘的确认");
//
//            }

        }

    }
} 
       ----消费者-----
  public class TopicConsumer1 {
    public static final String EXCHANGE_NAME="log2";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        //得到临时队列

        //交换机的名字和类型
//        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName="Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        //!!!核心代码,就是发送到的rountingKey和队列绑定
        channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
        channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.aa");
        DeliverCallback deliverCallback=(tag, delivery)->{
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
        };
        CancelCallback nCallback=(tag)->{

            System.out.println("失败应答");
        };

        boolean IsAck=false;
        //消费信息
        channel.basicConsume("Q1",IsAck,deliverCallback,(tag)->{});



    }
}

8.死信队列(不能被消费的信息放到死信队列,防止消息丢失) 管理界面队列有DLK代表开启了死信

1.来源
—1.消息TTL过期(信息指定时间会过期)
—2.队列达到最大长度(队列满了)
—3.信息被拒绝(basic.reject或basic.nack不应答) requeue=false(信息不放回队列,丢失?)

------2.实现(绑定普通队列和死信队列) 普通和死信交换机都为 dirrect
如图mq4
2.rabbitMQ之交换机

   //普通队列要声明的时候加入arg才会转发到死信队列 !!!注意设置的普通队列的args,不是死信的
    -----消费者-----
  Map<String,Object> args=new HashMap();
  args.put("x-dead-letter-exchange","DEAD_EXCHANGE_NAME");
  //设置死信 routingKey
args.put("x-dead-letter-routing-key","lisi");
     channe.queueDeclare(...,args);
//过期时间这里可以设置,也可producer声明
//声明队列
--------生产者--------  
//设置ttl time to live过期时间
   channel.basicPublish(...,prop,...);             

9.死信队列之队列达到最大长度

   //消费者设置最大正常队列的长度

10.死信队列之信息被拒绝

 //消费者
     channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);//不放回队列
//一定要开启手动应答

11.延迟队列.是死信队列的过期时间(企业上班案例)
//整合springboot 选2.3.1,复制依赖,复制配置
//整合后不用自己声明队列和交换机,由专门的配置类写
1.配置文件类


  @Configuration //配置类上面写
     //声明交换机
     @Bean("xExchange")
     public DirectExchange xExchange(){
return new DirectExchange("name");
    }
    //声明队列
    @Bean("queueA")
     public Queue xExchange(){
             //指定死信 xxx参数
return QueueBuilder.durable(Queue_A).withArguments(xxx).build();
    }
   //死信队列
    @Bean("queueA")
     public Queue xExchange(){
             //没有参数
return QueueBuilder.durable(DEAD_QUEUE).build();
    }
   //绑定,名字必须要有语义化 semantic
      @Bean
     public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
             //,队列和交换机绑定,并指定routingKey
return BindingBuilder.bind(queueA).to(xExchange).with("XB");
    }    
    2.写controller接收消息
@Autowired
 private RabbitTemplate rabbitTemplate;
log.info("xxxx{},xxx{}",new Date().toString(),msg);
//发送消息来自spring公司的工具,转发到交换机和发送
  rabbitTemplate.convertAndSend("X","routingKey","xxmsg")
   3.死信消费者接收消息 需要监听器
    @Slf4j
     @Component
           xx class
                    @RabbitListener(queues="QD") 
                //注意!!!msg是spring的类,Channel是mq的,刚开始导错包
           public void xx(Message msg,Channel channel){
sout(new String(msg.getBody()));
       }

12.延迟优化 不在队列声明写延迟时间,而在生产者的声明,就不用一直更新队列代码
图mq5

2.rabbitMQ之交换机
----1.增加一个不设置时间的队列

//controller当生产者
rabbitTemplate.convertAndSend("exchange_x", "XB", "消息来自 ttl 为 xS 的队列: "+message, correlationData ->{
        correlationData.getMessageProperties().setExpiration(ttl);
        return correlationData;
    });

2.死信做延迟的缺陷,因为是排队的,所以发送多条消息不同延迟时间,按第一条的时间延迟(mq只会检查第一条消息是否过期)(导致先发送时间长的数据一直等待,其他后发送的数据在等待完成用同一时间送达)

3.解决方法(使用插件) 将我们的插件 复制到 mq的plugin的文件夹下(到exchange页面会多一个x-delayed-messeage选项)由交换机延迟了文章来源地址https://www.toymoban.com/news/detail-431540.html

//文件夹的路径
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
//插件放到文件夹后,enable开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 //只能配置类写自定义交换机
 @Bean
 public CustomExchange delayedExchange(){

 
  }   
 //绑定交换机和队列
//生产者设置时间,注意之前是 setExpiration现在是setDelay交换机延迟  
    //代码如下
    -----配置类-------
@Configuration
public class TtlQueueConfig{
    private  static String Exchange_X= "exchange_x";
    private  static String Exchange_Y_DEAD= "exchange_y_dead";
    private  static String QUEUE_A= "queue_a";
    private  static String QUEUE_B= "queue_b";
    private  static String QUEUE_D= "queue_dead";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(Exchange_X);
    }
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Exchange_Y_DEAD);
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型 !!!
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                args);
    }


    @Bean("queueA")
    public Queue queueA(){
//        Map<String, Object> args = new HashMap<>();
//        //声明当前队列绑定的死信交换机
//        args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
//        //声明当前队列的死信路由 key
//        args.put("x-dead-letter-routing-key", "YD");
//        //声明队列的 TTL
//        args.put("x-message-ttl", 10000);
//        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
        return QueueBuilder.durable(QUEUE_A).build();
    }
    @Bean("queueB")
    public Queue queueB(){
//        Map<String, Object> args = new HashMap<>();
//        //声明当前队列绑定的死信交换机
//        args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
//        //声明当前队列的死信路由 key
//        args.put("x-dead-letter-routing-key", "YD");
//        //声明队列的 TTL
//        args.put("x-message-ttl", 40000);
//        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
        return QueueBuilder.durable(QUEUE_B).build();
    }
    @Bean("queueDead")
    public Queue queueDead(){
        return QueueBuilder.durable(QUEUE_D).build();
    }

    @Bean("xBindingQueueA")
    public Binding xBindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean("xBindingQueueB")
    public Binding xBindingQueueB(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean("yBindingQueueDead")
    public Binding yBindingQueueDead(@Qualifier("queueDead") Queue queueDead,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueDead).to(yExchange).with("YD");
    }



    @Bean
    public Binding bindingDelayedQueue(@Qualifier("queueDead") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }





} 
    ---------死信消费者-------
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues="queue_dead")
    public void deadMeg(Message msg, Channel channel){
        String s = new String(msg.getBody());

        log.info("时间,{}消息{}",new Date(),s);
    }


}

到了这里,关于2.rabbitMQ之交换机的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】RabbitMQ的交换机

    在上文中, 都没有交换机,生产者直接发送消息到队列。 而一旦引入交换机,消息发送的模式会有很大变化:可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化: Publisher:生产者,不再发送消息到队列中,而是发给交换机 Exchange:交换机,一方面,接收生

    2024年03月12日
    浏览(54)
  • 【RabbitMQ(day3)】扇形交换机和主题交换机的应用

    扇型交换机将消息路由给绑定到它身上的所有队列,而不会理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由。 因为扇型交换机投递信息的

    2024年02月14日
    浏览(48)
  • 02、RabbitMQ交换机

    目录 1.、Exchange(交换机)的作用  2、Exchange(交换机)的类型 2.1.直连交换机:Direct Exchange 2.2.主题交换机:Topic Exchange 2.3.扇形交换机:Fanout Exchange 2.4.首部交换机:Headers exchange 2.5.默认交换机 2.6.Dead Letter Exchange(死信交换机) 3、交换机的属性 4、综合案例:交换机的使用 给子模

    2024年02月04日
    浏览(57)
  • RabbitMQ交换机(1)

    RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange), 交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将

    2024年01月17日
    浏览(43)
  • RabbitMQ-交换机类型

    RabbitMQ的交换机类型总共有四种 1-直连交换机(Direct exchange) 消息发送到此交换机上时,交换机会将此消息发送到RoutingKey和消息中RoutingKey完全匹配的的队列(如果匹配了多个队列,则每个队列都会收到相同的消息)。 2-扇形交换机(Fanout exchange) 这个交换机机,会将收到的

    2023年04月08日
    浏览(39)
  • RabbitMQ交换机

    (1)接收publisher发送消息 (2)将消息按照规则路由到与之绑定的队列 (1)Fanout(广播) Fanout Exchange会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式 (2)Direct(定向) Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由 每一个

    2024年01月19日
    浏览(44)
  • RabbitMQ交换机类型

    先附加下官网文档。RabbitMQ的交换机类型共有四种,是根据其路由过程的不同而划分成的: 直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机,并且具备了选择性接收消息的

    2024年02月06日
    浏览(44)
  • RabbitMQ之交换机

    目录 前言 一.关于交换机 1.交换机工作原理 2.交换机类型及路由规则 二.交换机实战讲解 1.直连交换机 2.主题交换机 3.扇形交换机 总结 RabbitMQ 中的交换机(Exchange)是消息的分发中心,负责将消息发送到一个或多个队列。它接收生产者发送的消息并将这些消息路由到消息队列

    2024年01月25日
    浏览(49)
  • RabbitMq交换机类型介绍

    在RabbitMq中,生产者的消息都是通过交换器来接收,然后再从交换器分发到不同的队列,再由消费者从队列获取消息。这种模式也被成为“发布/订阅”。 分发的过程中交换器类型会影响分发的逻辑。 直连交换机:Direct exchange 扇形交换机:Fanout exchange 主题交换机:Topic excha

    2024年02月12日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包