RabbitMQ工作模式-路由模式

这篇具有很好参考价值的文章主要介绍了RabbitMQ工作模式-路由模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
RabbitMQ工作模式-路由模式,rabbitmq,学习笔记,JAVA,rabbitmq,java,java-rabbitmq

使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。

样例使用日志分发为样例。即按日志不同的级别,分发到不同的队列。每个队列只处理自己的对应的级别日志。

创建生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;

public class Product {

  private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明交换机,交换器和消息队列的绑定不需要在这里处理。
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        false,
        // 属性
        null);

    for (int i = 0; i < 30; i++) {
      String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
      String dataMsg = "[" + level + "] 消息发送 :" + i;
      // 发送消息
      channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));
    }
  }
}

创建ERROR的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class ErrorConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列并绑定
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        false,
        // 属性
        null);

    // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
    channel.queueDeclare(
        "log.error",
        // 永久
        false,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    //消费者享有绑定到交换器的权力。
    channel.queueBind("log.error", "ex.routing", "ERROR");

    // 通过chanel消费消息
    channel.basicConsume(
        "log.error",
        (consumerTag, message) -> {
          System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        },
        consumerTag -> {});
  }
}

创建INFO级的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class InfoConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列并绑定
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        true,
        // 属性
        null);

    // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
    channel.queueDeclare(
        "log.info",
        // 永久
        false,
        // 排他
        false,
        // 自动删除
        false,
        // 属性
        null);

    //消费者享有绑定到交换器的权力。
    channel.queueBind("log.info", "ex.routing", "INFO");

    // 通过chanel消费消息
    channel.basicConsume(
        "log.info",
        (consumerTag, message) -> {
          System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        },
        consumerTag -> {});
  }
}

创建WARN级别的消息者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class WarnConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列并绑定
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        false,
        // 属性
        null);

    // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
    channel.queueDeclare(
        "log.warn",
        // 永久
        false,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    //消费者享有绑定到交换器的权力。
    channel.queueBind("log.warn", "ex.routing", "WARN");

    // 通过chanel消费消息
    channel.basicConsume(
        "log.warn",
        (consumerTag, message) -> {
          System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        },
        consumerTag -> {});
  }
}

首先启动三个消费者:

查看队列及交换机情况

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.info         │ queue            │ log.info    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.warn         │ queue            │ log.warn    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.error        │ queue            │ log.error   │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.error        │ queue            │ ERROR       │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.info         │ queue            │ INFO        │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.warn         │ queue            │ WARN        │           │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[root@nullnull-os ~]# 

可以发现,交换器ex.routing 绑定了三个队列log.errorlog.info log.warn并指定了路由键。

启动消费者,查看消息通否被正常消费。

ERROR的消费者控制台输出

ERROR收到的消息:[ERROR] 消息发送 :1
ERROR收到的消息:[ERROR] 消息发送 :2
ERROR收到的消息:[ERROR] 消息发送 :6
ERROR收到的消息:[ERROR] 消息发送 :8
ERROR收到的消息:[ERROR] 消息发送 :9
ERROR收到的消息:[ERROR] 消息发送 :11
ERROR收到的消息:[ERROR] 消息发送 :15
ERROR收到的消息:[ERROR] 消息发送 :16
ERROR收到的消息:[ERROR] 消息发送 :19
ERROR收到的消息:[ERROR] 消息发送 :20
ERROR收到的消息:[ERROR] 消息发送 :21
ERROR收到的消息:[ERROR] 消息发送 :23
ERROR收到的消息:[ERROR] 消息发送 :24
ERROR收到的消息:[ERROR] 消息发送 :27
ERROR收到的消息:[ERROR] 消息发送 :28

INFO的消费者控制台输出:

INFO收到的消息:[INFO] 消息发送 :0
INFO收到的消息:[INFO] 消息发送 :3
INFO收到的消息:[INFO] 消息发送 :4
INFO收到的消息:[INFO] 消息发送 :13
INFO收到的消息:[INFO] 消息发送 :14
INFO收到的消息:[INFO] 消息发送 :22
INFO收到的消息:[INFO] 消息发送 :25

WARN的消费都控制台输出:

warn收到的消息:[WARN] 消息发送 :5
warn收到的消息:[WARN] 消息发送 :7
warn收到的消息:[WARN] 消息发送 :10
warn收到的消息:[WARN] 消息发送 :12
warn收到的消息:[WARN] 消息发送 :17
warn收到的消息:[WARN] 消息发送 :18
warn收到的消息:[WARN] 消息发送 :26
warn收到的消息:[WARN] 消息发送 :29

至此,验证已经完成。文章来源地址https://www.toymoban.com/news/detail-691625.html

到了这里,关于RabbitMQ工作模式-路由模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ:工作队列模式

    📃个人主页:不断前进的皮卡丘 🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记 🔥个人专栏:消息中间件 工作队列(又名: 任务队列 )背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反

    2024年01月23日
    浏览(39)
  • RabbitMQ 工作模式介绍

    RabbitMQ 是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定信使最终会将邮件交付给您的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个信件载体。 RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是

    2024年02月06日
    浏览(40)
  • RabbitMQ五大常用工作模式

    消息生产者 消息消费者(会一直监听队列) 工作队列 消息生产能力大于消费能力,增加多个消费节点 和简单队列类似,增加多个消费节点,处于竞争关系 默认策略:round robin轮训 生产者 消费者1 消费者2 轮训策略验证 先启动两个消费者,再启动生产者 缺点:存在部分节点

    2024年02月19日
    浏览(43)
  • RabbitMQ的工作模式

    RabbitMQ 的工作模式 一 .simple 模式(即最简单的收发模式) 二 .work 工作模式 ( 资源的竞争 ) publish_subscribe 发布订阅 (../../../../../0 马士兵 / 新建文件夹 /BAT 面试突击资料 (1)/ 整理 /BAT 面试突击资料 /15- 消息中间件 MQ 面试题( 2020 最新 版) .assets/publish_subscribe 发布订阅 ( 共享资

    2024年02月06日
    浏览(70)
  • 【RabbitMQ】Spring整合RabbitMQ、Spring实现RabbitMQ五大工作模式(万字长文)

    目录 一、准备 1、创建maven项目​编辑 2、引入依赖 3、创建配置文件 1.RabbitMQ配置文件 2.生产者项目配置文件 3.消费者项目配置文件 二、生产者xml中文件创建队列 三、生产者xml文件中创建交换机以及绑定队列 1、创建交换机 2、绑定队列  四、消费者xml文件中创建队列消息监

    2024年01月21日
    浏览(41)
  • 消息队列之RabbitMQ工作模式

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 消息队列之RabbitMQ工作模式 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在这篇博客中,我将深入探讨 RabbitMQ 的工作模式,带你

    2024年01月18日
    浏览(51)
  • RabbitMQ消息队列的工作模式

    官方文档地址:https://www.rabbitmq.com/getstarted.html 工作模式其实就是消息队列分发消息的路由方式。 RabbitMQ常用的几种工作模式: 简单模式 WorkQueues工作队列模式 PubSub生产者/PubSub消费者模式 Routing路由模式 Topics通配符模式 发布/订阅模式(Publish/Subscribe):该模式用于一对多的

    2024年02月15日
    浏览(44)
  • RabbitMQ的6种工作模式

    官方文档: http://www.rabbitmq.com/ https://www.rabbitmq.com/getstarted.html RabbitMQ 常见的 6 种工作模式: 1)、消息产生后将消息放入队列。 2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。 3)、存在的问题:消息可能没有被消费者正确处

    2024年02月14日
    浏览(36)
  • 消息队列——rabbitmq的不同工作模式

    目录 Work queues 工作队列模式  Pub/Sub 订阅模式 Routing路由模式 Topics通配符模式   工作模式总结 C1和C2属于竞争关系,一个消息只有一个消费者可以取到。  代码部分只需要用两个消费者进程监听同一个队里即可。 两个消费者呈现竞争关系。 用一个生产者推送10条消息 两个监

    2024年02月16日
    浏览(43)
  • RabbitMQ常用工作模式+整合springboot

    目录 1.MQ的相关概念 1.1 什么是MQ消息中间件 1.2 为什么使用MQ (1) 应用解耦 (2) 异步提速   (3)削峰填谷 1.3 使用MQ的劣势 1.4 常见的MQ组件​​​​​​​ 2. RabbitMQ的概述 2.1 RabbitMQ的概念 2.2 RabbitMQ的原理 2.3 安装RabbitMQ 3. RabbitMQ 的工作模式 3.1 simple (简单模式) 3.2 Work queues(工作模

    2024年02月15日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包