消息队列之RabbitMQ工作模式

这篇具有很好参考价值的文章主要介绍了消息队列之RabbitMQ工作模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
消息队列之RabbitMQ工作模式


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

在这篇博客中,我将深入探讨 RabbitMQ 的工作模式,带你领略它的强大之处。我们将了解到 RabbitMQ 如何通过队列、交换机、绑定等核心概念实现消息的路由和传递。你将学习到不同类型的交换机,以及它们如何根据消息的路由键将消息路由到相应的队列。
无论你是刚刚接触消息队列,还是已经有一定经验的开发者,这篇博客都将为你提供深入了解 RabbitMQ 工作模式的机会。通过实际的案例和代码示例,我将帮助你更好地理解如何使用 RabbitMQ 来解决实际的业务问题。
让我们一起探索生成消息队列之 RabbitMQ 的世界,掌握其工作模式,提升我们的应用架构和开发能力!


提示:以下是本篇文章正文内容,下面案例可供参考

一、RabbitMQ工作模式介绍

  • 简单模式(Simple):在简单模式下,生产者将消息发送到队列,消费者从队列中获取消息并进行处理。生产者和消费者之间没有直接的通信,而是通过 RabbitMQ 进行消息的传递和处理。这种模式适用于简单的消息传递场景。
  • 工作队列模式(Work Queue):在工作队列模式下,多个消费者可以同时从同一个队列中获取消息并进行处理。这种模式适用于需要并发处理消息的场景,例如后台处理任务或分布式计算。
  • 发布/订阅模式(Publish/Subscribe):在发布/订阅模式下,生产者将消息发布到一个特定的主题(Topic),多个消费者可以订阅该主题并接收所有发布的消息。这种模式适用于消息广播和消息分发,例如通知系统或消息推送。
  • 路由模式(Routing):在路由模式下,生产者将消息发送到一个交换机,交换机根据消息的路由键(Routing Key)将消息转发到多个队列。消费者可以从不同的队列中获取消息,并根据消息的内容进行处理。这种模式适用于复杂的消息路由和处理需求,例如根据不同的业务规则将消息转发到不同的队列。
  • 通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

二、工作模式的使用

环境准备

1.启动RabbitMQ

# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

2.创建普通maven项目,添加RabbitMQ依赖

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

简单模式

简单模式是RabbitMQ工作模式中的一种,是指一个生产者发送消息,一个消费者消费消息。消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。这种模式下不需要将Exchange进行任何绑定操作。
简单模式常用于一个生产者,一个消费者的情况。例如,在电子商务应用中,生产者可以是一个订单处理程序,消费者可以是一个物流跟踪程序,生产者将订单信息发送到队列,消费者从队列中获取订单信息并进行处理。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,如果队列已存在,则使用该队列
    /**
     * 参数1:队列名
     * 参数2:是否持久化,true表示MQ重启后队列还在。
     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
     * 参数5:其他额外参数
     */
    channel.queueDeclare("simple_queue",false,false,false,null);
    // 5.发送消息
    String message = "hello!rabbitmq!";
    /**
     * 参数1:交换机名,""表示默认交换机
     * 参数2:路由键,简单模式就是队列名
     * 参数3:其他额外参数
     * 参数4:要传递的消息字节数组
     */
    channel.basicPublish("","simple_queue",null,message.getBytes());
    // 6.关闭信道和连接
    channel.close();
    connection.close();
    System.out.println("===发送成功===");
   }
}

编写消费者

// 消费者
public class Consumer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列
    /**
     * 参数1:监听的队列名
     * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
     * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
     */
    channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("接受消息,消息为:"+message);
       }
     });
   }
}

工作队列模式

工作队列模式是RabbitMQ工作模式的一种,由一个生产者和多个消费者组成。生产者发送消息到消息队列,消费者轮询进行消费。
与简单模式相比,工作队列模式的区别在于可以有多个消费者共同消费一个队列中的消息。工作队列模式使用默认交换机,可以避免消费积压。有两种消息分配策略:循环调度和消息确认。循环调度是默认策略,消息队列会循环依次给每个消费者发送消息,每个消费者都会收到相同数量的消息,但可能导致丢失消息。消息确认策略下,消费者在消费完某个消息,并发送确认消息后,消息队列会重新将新消息推送给该消费者进行消费,同时队列可以自由删除消费过的消息。如果消费者没有发送确认消息的情况下死亡(如通道关闭、链接关闭、TCP链接丢失等),队列了解到消息未被完全处理,会将消息重新排队。如果同时有其它消费者在线消费,那么这条消息会被快速传递给另一个消费者进行消费,这样可以确保不丢失任何消息。消费者消费确认的超时时间默认是30分钟。

编写生产者

public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,持久化队列
    channel.queueDeclare("work_queue",true,false,false,null);
    // 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
    for (int i = 1; i <= 100; i++) {
      channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
           ("你好,这是今天的第"+i+"条消息").getBytes());
     }
    // 6.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 消费者1
public class Consumer1 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者1消费消息,消息为:"+message);
       }
     });
   }
}


// 消费者2
public class Consumer2 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者2消费消息,消息为:"+message);
       }
     });
   }
}


// 消费者3
public class Consumer3 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者3消费消息,消息为:"+message);
       }
     });
   }
}

发布订阅模式

发布订阅模式是RabbitMQ工作模式的一种,由一个生产者和多个消费者组成。生产者将消息发送到交换机,交换机将消息转发到多个队列,每个队列都有一个相应的消费者进行消费。
发布订阅模式的特点是每个消费者都可以订阅一个或多个主题,并接收所有与该主题相关的消息。消费者可以通过订阅不同的主题来接收不同类型的消息,并根据消息的内容进行处理。
发布订阅模式适用于需要向多个消费者发送相同类型的消息的场景,例如通知系统或消息推送。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    /**
     * 参数1:交换机名
     * 参数2:交换机类型
     * 参数3:交换机持久化
     */
    channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
    channel.queueDeclare("SEND_STATION",true,false,false,null);
    // 6.交换机绑定队列
    /**
     * 参数1:队列名
     * 参数2:交换机名
     * 参数3:路由关键字,发布订阅模式写""即可
     */
    channel.queueBind("SEND_MAIL","exchange_fanout","");
    channel.queueBind("SEND_MESSAGE","exchange_fanout","");
    channel.queueBind("SEND_STATION","exchange_fanout","");
    // 7.发送消息
    for (int i = 1; i <= 10 ; i++) {
      channel.basicPublish("exchange_fanout","",null,
           ("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
     }
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 站内信消费者
public class CustomerStation {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

路由模式

RabbitMQ的路由模式是指,队列在绑定交换机时,需要指定一个RoutingKey(路由键)。消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再将消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。
这种模式可以实现有选择地接收消息,通过指定不同的RoutingKey,将消息发送到不同的队列,从而实现消息的精准投递。例如,一个队列与一个交换器进行绑定时,可以指定多个不同的RoutingKey,当消息的RoutingKey与队列的RoutingKey相匹配时,消息才会被投递到该队列中。
请注意,使用路由模式时,需要确保RoutingKey的设置与实际需求相符,以确保消息能够准确地投递到目标队列中。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL2",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
    channel.queueDeclare("SEND_STATION2",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL2","exchange_routing","import");
    channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","normal");
    // 7.发送消息
    channel.basicPublish("exchange_routing","import",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_routing","normal",null,
        "小心促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

通配符模式

RabbitMQ的通配符模式是指,队列在绑定交换机时,可以使用通配符来匹配消息的路由键。此时,队列需要绑定到一个模式上,通配符的规则为:

  • 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
  • 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。

例如,“audit.#”能够匹配到“audit.irs.corporate”。通配符模式可以实现有选择地接收消息,通过指定不同的通配符,将消息发送到不同的队列,从而实现消息的精准投递。

编写生产者

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL3",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
    channel.queueDeclare("SEND_STATION3",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
    channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
    channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
    // 7.发送消息
    channel.basicPublish("exchange_topic","mail.message.station",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_topic","station",null,
        "小型促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

编写消费者

// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机


    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机


    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }


}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("zhangsan");
    connectionFactory.setPassword("zhangsan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机


    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

三、SpringBoot整合RabbitMQ

项目搭建

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

创建对列和交换机

@Configuration
public class RabbitConfig {
  private final String EXCHANGE_NAME = "boot_topic_exchange";
  private final String QUEUE_NAME = "boot_queue";


  // 创建交换机
  @Bean("bootExchange")
  public Exchange getExchange() {
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME) // 交换机类型
         .durable(true) // 是否持久化
         .build();
   }


  // 创建队列
  @Bean("bootQueue")
  public Queue getMessageQueue() {
    return new Queue(QUEUE_NAME); // 队列名
   }


  // 交换机绑定队列
  @Bean
  public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
    return BindingBuilder
         .bind(queue)
         .to(exchange)
         .with("#.message.#")
         .noargs();
   }
}

编写生产者

@SpringBootTest
public class TestProducer {
  // 注入RabbitTemplate工具类
  @Autowired
  private RabbitTemplate rabbitTemplate;


  @Test
  public void testSendMessage(){
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送的消息
     */
    rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
   }
}

编写消费者

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- rabbitmq起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写配置文件

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: zhangsan
   password: zhangsan
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

3.编写消费者,监听队列

@Component
public class Consumer {
  // 监听队列
  @RabbitListener(queues = "boot_queue")
  public void listen_message(String message){
    System.out.println("发送短信:"+message);
   }
}


总结

提示:这里对文章进行总结:

在这篇博客中,我们深入探讨了 RabbitMQ 的几种工作模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式和通配符模式。
简单模式是消息传递的基本模式,其中生产者将消息发送到队列,消费者从队列中接收消息并进行处理。
工作队列模式允许多个消费者同时处理队列中的消息,提高了消息处理的效率。
发布/订阅模式实现了一对多的消息发布,一个生产者可以将消息发送给多个消费者。
路由模式使得消息能够根据路由键被发送到不同的队列,实现了更灵活的消息路由。
通配符模式通过使用通配符,可以有选择地接收符合特定规则的消息。
了解这些工作模式对于有效利用 RabbitMQ 进行消息传递和构建可靠的分布式系统至关重要。每种模式都有其独特的优势和适用场景,根据具体需求选择合适的工作模式可以提高系统的性能、可扩展性和可靠性。
希望这篇博客对你了解 RabbitMQ 的工作模式有所帮助!如果你有任何问题或想进一步讨论,欢迎在评论中留言。文章来源地址https://www.toymoban.com/news/detail-800461.html

到了这里,关于消息队列之RabbitMQ工作模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQ-消息队列-RabbitMQ

    MQ(Message Queue) 消息队列 ,是基础数据结构中“ 先进先出 ”的一种 数据结构 。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由

    2024年02月09日
    浏览(48)
  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(50)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(42)
  • MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

    书接上文,展示一下五种模型我使用的是spring could 微服务的框架 文章说明:         本文章我会分享总结5种实用的rabbitMQ的实用模型 1、hello world简单模型 2、work queues工作队列 3、Publish/Subscribe发布订阅模型 4、Routing路由模型 5、Topics 主题模型 (赠送) 6、消息转换器 Rabbi

    2024年02月05日
    浏览(52)
  • mq 消息队列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十几年前,淘宝的notify,借鉴ActiveMQ。京东的ActiveMQ集群几百台,后面改成JMQ。 Linkedin的kafka,因为是scala,国内很多人不熟。淘宝的人把kafka用java写了一遍,取名metaq,后来再改名RocketMQ。 总的来说,三大原因,语言、潮流、生态。 MQ这种东西,当你的消息量不大的时候,用啥

    2024年02月12日
    浏览(49)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(99)
  • MQ的快速入门及RabbitMQ的五种工作模式

    目录 MQ基本知识 MQ基本概念 MQ概述 MQ的优势和劣势 优势 劣势 RabbitMQ 简介 基础架构 相关概念 JMS RabbitMQ安装 在线拉取镜像 安装MQ 访问控制台(http://ip地址:15672) 工作模式 简单模式(生产者消费者模式) Work Queues 工作队列模式 Pub/Sub 订阅模式 Routing 路由模式 Topics 通配符模式

    2024年02月13日
    浏览(36)
  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(53)
  • RabbitMQ工作模式-工作队列

    官网关于工作模式的解释地址:https://www.rabbitmq.com/getstarted.html Work Queue(工作队列) 生产者发消息,启动多个消费者来消费消息,每个消费者仅消费部分消息,可达到负载均衡的效果。 创建生产者 创建消费者 首先运行消息费,为了测试工作队列模式,消费都需要启动多个,

    2024年02月10日
    浏览(37)
  • RabbitMQ:工作队列模式

    📃个人主页:不断前进的皮卡丘 🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记 🔥个人专栏:消息中间件 工作队列(又名: 任务队列 )背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反

    2024年01月23日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包