202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型

这篇具有很好参考价值的文章主要介绍了202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型

就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费
fanout 类型就是广播模式

fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。

生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的一条副本,
而消费者也能分别从不同的队列中读取消息,互不干扰。

▲ fanout类型的Exchange可以很好地模拟JMS的Pub-Sub消息模型。

202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

代码演示:

都是在前面一篇的代码基础上修改的。
需求:使用 fanout 类型的Exchange ,实行发布-订阅的功能,其实就是创建一个生产者和两个消费者,实现广播模式的消息分发。

202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

生产者:producer

在生产者中声明Exchange ,然后声明两个消息队列 Queue,
然后给这个Exchange 绑定 这个两个Queue
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

消费者:Consumer01

两个消费者的代码没啥区别,
消费方法的参数 autoAck 都是true, 都是自动确认消费。
两个消费者各自消费自己指定的消息队列。

202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

消费者:Consumer02

202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式

测试结果

消费生产者发送10条消息,两个消费者都能各自消费到10条消息就是正确的。

消息生产者使用fanout这个广播的类型发送消息。
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式
两个消费者都能消费到10条消息,正确。
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型,Spring Boot,RabbitMQ,rabbitmq,java,fanout,rabbitmq广播模式文章来源地址https://www.toymoban.com/news/detail-726450.html

完整代码

ConnectionUtil

package cn.ljh.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//连接工具
public class ConnectionUtil
{
    //获取连接的方法
    public static Connection getConnection() throws IOException, TimeoutException
    {
        //创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来
        ConnectionFactory connectionFactory =  new ConnectionFactory();
        //设置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ljh");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/"); //连接虚拟主机
        //从连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        //返回连接
        return connection;
    }
}

Publisher

package cn.ljh.rabbitmq.producer;

import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

//消息生产者--使用fanout类型的exchange------就是广播模式
public class Publisher
{
    //常量:定义个Exchange的名字作为常量
    public static final String EXCHANGE_NAME = "myex01.fanout";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接
        Connection conn = ConnectionUtil.getConnection();
        //2、通过Connection获取Channel。
        Channel channel = conn.createChannel();

        //3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定
        channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange名字 */
                BuiltinExchangeType.FANOUT,/* Exchange 类型 */
                true,/* 是否持久化 */
                false,/* 是否自动栅除 */
                false,/* 是否为内部的 Exchange */
                null /* 指定 Exchange 的额外属性 */
        );

        //声明多个消息队列------声明第1个消息队列
        channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);

        //把 Exchange 和 Queue 绑定起来,绑定第一个消息队列
        channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,
                "" /* 因为Exchange 是fanout类型,所以无需 路由key */,
                null /* 指定 Exchange 的额外属性 */);

        //声明第2个消息队列
        channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);

        //把 Exchange 和 Queue 绑定起来,绑定第2个消息队列
        channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,
                "" /* 因为Exchange 是fanout类型,所以无需 路由key */,
                null /* 指定 Exchange 的额外属性 */);

        //生产者发送10条消息
        for (int i = 1; i <= 10; i++)
        {
            String message = "生产者发送的第【 " + i + " 】条消息的内容";

            //4、调用Channel的basicPublish()方法发送消息
            channel.basicPublish(EXCHANGE_NAME /* 向这个 fanout类型的 Exchange 发送消息 */,
                    "" /* 因为 Exchange 是fanout 类型,所以有没有路由key都无所谓 */,
                    null /*指定额外的消息的属性*/,
                    message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/
            );
            System.out.println("生产者发送【 "+i+" 】条消息完成");
        }
        //5、关闭资源
        //关闭通道
        channel.close();
        //关闭连接
        conn.close();
    }
}

Consumer01

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者1
public class Consumer01
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。

    //常量
    public final static String QUEUE01 = "firstQueue";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //3、调用 Channel 的 queueDeclare() 方法声明队列,
        //   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        channel.queueDeclare(QUEUE01, /* 声明的队列名 */
                true,    /* 消息队列是否持久化 */
                false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */
                false, /* 是否自动删除 */
                null   /* 指定消息队列额外的属性 */);


        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(
                QUEUE01 /*消费这个消费队列里面的消息*/,
                true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);

                    }
                }
        );
    }
}

Consumer02

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者2
public class Consumer02
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。

    //常量
    public final static String QUEUE02 = "secondQueue";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //3、调用 Channel 的 queueDeclare() 方法声明队列,
        //   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        channel.queueDeclare(QUEUE02, /* 声明的队列名 */
                true,    /* 消息队列是否持久化 */
                false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */
                false, /* 是否自动删除 */
                null   /* 指定消息队列额外的属性 */);

        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(
                QUEUE02 /*消费这个名字的消费队列里面的消息*/,
                true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);
                    }
                }
        );
    }


}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>rabbitmq_fanout</artifactId>
    <version>1.0.0</version>
    <name>rabbitmq_fanout</name>

    <!--  属性  -->
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
    </properties>

    <!--  依赖  -->
    <dependencies>
        <!-- RabbitMQ 的依赖库 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>

    </dependencies>


</project>

到了这里,关于202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】golang客户端教程3——发布订阅(使用fanout交换器)

    在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务只传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一个消息。这就是所谓的 “订阅/发布模式” 。 为了说明这种模式,我们将构建一个简单的日志系统。

    2024年02月14日
    浏览(42)
  • Python如何操作RabbitMQ实现fanout发布订阅模式?有录播直播私教课视频教程

    生产者 消费者 生产者 消费者 生产者 消费者

    2024年01月17日
    浏览(41)
  • RabbitMQ---订阅模型-Fanout

    Fanout,也称为广播。 流程图: 在广播模式下,消息发送流程是这样的: 1) 可以有多个消费者 2) 每个消费者有自己的queue(队列) 3) 每个队列都要绑定到Exchange(交换机) 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。 5)

    2024年02月11日
    浏览(49)
  • RabbitMq:Topic exchange(主题交换机)的理解和使用

    在RabbitMq中,生产者的消息都是通过交换机来接收,然后再从交换机分发到不同的队列中去,在分发的过程中交换机类型会影响分发的逻辑,下面主要讲解一下主题交换机。 ​ 主题交换机核心是可以以范围的行为向队列发送消息,它和直连交换机区别在于,直连交换机一个队

    2024年02月12日
    浏览(37)
  • RabbitMQ系列(27)--RabbitMQ使用Federation Exchange(联邦交换机)解决异地访问延迟问题

    前言: (broker北京)、(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA 中,就算在开启了publisherconfirm机制或

    2024年02月13日
    浏览(71)
  • Rabbitmq运用之fanout模式

    rabiitmq 的 fanout 属于多播模式,他的工作图如下,应用场景挺多的。比如订单,客户下单后,会发送消息告诉客户下单成功,通知仓库出货等。 在上面的图可以看到,项目里有四个启动项目 这里用到RabbitmqFanoutConsumerSend,RabbitmqFanoutConsumerWMS,RabbitmqFanoutProduct这三个控制台,另

    2024年02月12日
    浏览(35)
  • RabbitMQ--04--发布订阅模式 (fanout)-案例

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 @RabbitListener和@RabbitHandler的使用 OrderService OrderServiceImpl 在项目的test中发送请求 访问网址: http://localhost:15672/#/queues yml配置 SmsConsumerService、SmsConsumerServiceImpl EmailConsumerService、EmailConsumerServiceImpl DuanxinCo

    2024年04月14日
    浏览(44)
  • [中间件] RabbitMQ 的 Exchange 和 Queue 绑定:实现高效消息传递的关键步骤

    前言: 当今大多数分布式系统都需要进行异步消息传递,而 RabbitMQ 作为开源的消息队列系统,提供了一个高效的消息传递方案。但是在使用 RabbitMQ 进行消息传递时,如何正确绑定 Exchange 和 Queue 是十分重要的。本文将从 RabbitMQ Exchange 和 Queue 的定义、Exchange 和 Queue 绑定的目

    2024年04月15日
    浏览(35)
  • rabbitmq基础-java-3、Fanout交换机

    Fanout,英文翻译是扇出。 1)  可以有多个队列 2)  每个队列都要绑定到Exchange(交换机) 3)  生产者发送的消息,只能发送到交换机 4)  交换机把消息发送给绑定过的所有队列 5)  订阅队列的消费者都能拿到消息 交换机的作用是什么? 接收publisher发送的消息 将消息按

    2024年01月25日
    浏览(47)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(78)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包