RabbitMQ入门案例-Java

这篇具有很好参考价值的文章主要介绍了RabbitMQ入门案例-Java。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

什么是RabbitMQ
Q全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

  • 为什么使用MQ

        在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理            的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

  • 开发中消息队列通常有如下应用场景:

        1、任务异步处理

        将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应            用程序的响应时间。

        2、应用程序解耦合

        MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

        3、削峰填谷

        如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发          写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并量            会突然激增到5000以上,这个时候数据库肯定卡死了。

        解决办法: 消息被MQ保存起来,然后系统就可以按照自己的消费能力来消费,比如每1000             个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

        但是使用了MQ之后,限制消费的消息速度为1000,但是这样一来,高峰期产生的数据势会            被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费          的消息速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

RabbitMQ各组件功能

rabbitmq java,java-rabbitmq,rabbitmq,java

 Broker:标识消息队列服务器实体.

Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

Connection:网络连接,比如一个TCP连接。

Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

创建项目

可以在windows中安装RabbiMQ 也可以在Linux 中安装RabbiMQ 可以到百度上查怎么安装的 我写的文章中也有具体怎么安装的

安装后 在IDEA中创建一个 Maven项目

添加依赖 Maven    

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

rabbitmq java,java-rabbitmq,rabbitmq,java

 在编写代码前 要开启RabbitMQ的服务 否则是无法使用的

什么是生产者什么是消费者 .就好比微信发消息.发消息的一方是生产者(生产消息),而消费者就是获取消息的一方

学过Socket通信的就很好掌握在Socket中客户端就好比是生产者, 而服务端就好比是消费者但是不一样的是,服务端必须保证在线.否则客户端是无法发送消息的,也就是连接失败,而在RabbitMQ ,利用队列来存储客户端(生产者)发送的消息(就好比数据库) 而服务端(消费者)开启后会自动获取队列中的全部消息 ,而队列就好比是中间商 来帮助 客户端和服务端 交互 不用 服务端一直开启

我们在来说说微信 : 我们在微信中发生一条信息给其他用户,但是那个用户不在线,信息发过去了没???

答案是没有, 那么存储在哪里了? , 存储在你和他的队列里了, 当他上线后自动会从队列中获取,你发送的全部消息

下面我们就来实现,上面讲述的功能

编写RabbitMQ 连接

package com.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    //队列名称
 public    static final String QUEUE_NAME = "simple_queue";
    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
      //主机地址  如果是本机就是localhost   如果在其他 地方比如:虚拟机中 那么就是ip地址
        connectionFactory.setHost("192.168.216.128");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称   就是和你用户绑定的虚拟机  在创建用户时候就指定了
        connectionFactory.setVirtualHost("/itcast");
        //连接用户名
        connectionFactory.setUsername("admin");
        //连接密码
        connectionFactory.setPassword("admin");

        //创建连接
        return connectionFactory.newConnection();
    }

}

编写生产者

Producer

package com.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
//生产者
public class Producer {


    public static void main(String[] args) throws Exception {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);

        // 要发送的信息
        String message = "你好;小兔子111!";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 关闭资源
        channel.close();
        connection.close();
    }
}


在执行上述的消息发送之后;可以用发送消息的账户,登录RabbitMQ的管理控制台,可以发现队列和其消息:

我是在linux上搭建的启动服务器后在游览器上输入http://192.168.216.128:15672就能进入RabbitMQ ,ip是linux 的ip地址windows也一样

rabbitmq java,java-rabbitmq,rabbitmq,java

 查看发送的消息信息

rabbitmq java,java-rabbitmq,rabbitmq,java

rabbitmq java,java-rabbitmq,rabbitmq,java

 编写消费者

package com.rabbitmq;
import com.rabbitmq.client.*;

import java.io.IOException;
//消费者
public class Consumer {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.QUEUE_NAME, true, consumer);

        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}


当启动消费者后 自动获取 生产者 发送到队列里的消息 而且持续保持监听

只要消费者接受到消息 那么 就会将队列里对应的消息删除

你也可以手动收到消息

//手动确认消息
channel.basicAck(envelope.getDeliveryTag(), true);

那么就要将 是否自动确认 关闭

channel.basicConsume(ConnectionUtil.QUEUE_NAME, false, consumer);


具体怎么使用 看下面的案例

Work Queues(工作队列模式)


在上面的入门案例中我们 完成了 一个消费者对应一个生产者

这个案例中我们完成一个生产者对应多个消费者 来加快消息的处理

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

比如 抢购活动 下了1000个订单 我可以使用多个消费者来 分别处理这1000个订单 Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多个消费者, 进行并行消费。只是确认消息 需要手动确定

结构图:

rabbitmq java,java-rabbitmq,rabbitmq,java

 RabbitMQ连接上面有这里就不写了

生产者

Producer

package com.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
//生产者
public class Producer {


    public static void main(String[] args) throws Exception {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 30; i++) {
            // 要发送的信息
            String message = "你好;小兔子!---- "+i;
            /**
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}

一次生产 30条 "你好;小兔子!---- "+i;

可以到RabbitMQ的管理控制台 查看

消费者 1

Consumer1

package com.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

//消费者
public class Consumer {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.QUEUE_NAME, true, consumer);

        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}


代码Consumer1 和Consumer2 一模一样 复制一下 改下文件名 ,启动消费者 1 和 消费者2 然后 运行 生产者

部分截图:

rabbitmq java,java-rabbitmq,rabbitmq,java

rabbitmq java,java-rabbitmq,rabbitmq,java

 

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。 简单来说就是谁抢到就是谁的

Publish/Subscribe(订阅模式)

在之前的 模式中都是 一次只能将消息发送给一个队里 那么我们可以将消息发送给所有指定的队列里吗??

当前可以 我们可以使用交换机 来代替我,们发送 就和中间商一样 我们把消息给中间商,而中间商 帮助我们来发送消息给各个用户 这就是订阅模式

订阅模式是: 将某一个消费者 的消息发给多个队里 而队列里的所有消费者都能共享到此消息

就拿微信群来说: 我在群中发送一条消息 只要是 群里的用户 都能接收到我的消息,这原理就是 发送者将消息交给交换机而交换机在发送给 此群所有人的队列里因为此群所有人的队列 都和此交换机绑定了如果还不懂 那么 微信公众号 知道吧如果你不关注他的公众号那么他发送的信息你是不会收到的 , 如果你关注了他的公众号那么就相当于和此公众号的交换机绑定了 那么公众号 发生一条消息 给交换机,此交换机就会将 消息发生给所有绑定此交换机的微信用户的队列里 当微信在线时候就会自动接收到消息

rabbitmq java,java-rabbitmq,rabbitmq,java

 

package com.itheima.rabbitmq.simple.ps;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    //交换机名称
    static final String FANOUT_EXCHAGE = "fanout_exchange";
    //队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    //队列名称
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址  如果是本机就是localhost   如果在其他 地方比如:虚拟机中 那么就是ip地址
        connectionFactory.setHost("localhost");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称   就是和你用户绑定的虚拟机
        connectionFactory.setVirtualHost("/");
        //连接用户名
        connectionFactory.setUsername("guest");
        //连接密码
        connectionFactory.setPassword("guest");

        //创建连接
        return connectionFactory.newConnection();
    }

}

生产者

package com.rabbitmq;


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 发布与订阅使用的交换机类型为:fanout
 */
public class Producer { //生产者


    public static void main(String[] args) throws Exception {

        //创建连接
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        /**
         * 创建交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_2, true, false, false, null);

        //队列绑定交换机   也就是每次消息要推送的队列
        channel.queueBind(ConnectionUtil.FANOUT_QUEUE_1, ConnectionUtil.FANOUT_EXCHAGE, "");
        channel.queueBind(ConnectionUtil.FANOUT_QUEUE_2, ConnectionUtil.FANOUT_EXCHAGE, "");

        for (int i = 1; i <= 10; i++) {
            // 发送信息
            String message = "你好;小兔子!发布订阅模式--" + i;
            /**
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish(ConnectionUtil.FANOUT_EXCHAGE, "", null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }

        // 关闭资源
        channel.close();
        connection.close();
    }
}


消费者1

package com.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer{  //消费者1

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_1, true, false, false, null);

        //队列绑定交换机
        channel.queueBind(ConnectionUtil.FANOUT_QUEUE_1, ConnectionUtil.FANOUT_EXCHAGE, "");

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.FANOUT_QUEUE_1, true, consumer);
    }
}


消费者2 将消费者1代码复制一份,然后将 FANOUT_QUEUE_1 都换成FANOUT_QUEUE_2就行 ,先运行 所有消费者然后在运行生产者

控制台结果:

rabbitmq java,java-rabbitmq,rabbitmq,java

rabbitmq java,java-rabbitmq,rabbitmq,java 

 

可以看出来 交换机成功的把 消息 发送到两个队列里了

然后我们在来看看 RabbiMQ 控制台里

rabbitmq java,java-rabbitmq,rabbitmq,java

 

这个是我们创建的交换机

点击fanout_exchange 进入里面

rabbitmq java,java-rabbitmq,rabbitmq,java

 

这就是我们绑定的队列 看看就行了别乱点

我们在看看队列

rabbitmq java,java-rabbitmq,rabbitmq,java

 

这就是我们创建的队列 我们点击fanout_queue_1进入

rabbitmq java,java-rabbitmq,rabbitmq,java

 

这个就是此队列绑定的交换机

RoutingKEY(路由模式)

什么是路由模式:

就是在订阅模式的基础上 加个RoutingKEY标记这个标记的作用是来区分消息的发送, 就和送快递一样根据地址将快递送给对应的人也可以说是分流我们还拿微信来说:

就比如一个企业群:普通消息是所有人都能接收到 而有些消息只能管理层才能接收到 ,比如:通知经理以上员工来开会 这就需要管理层关联管理队列和普通队列 ,普通员工关联普通队列

在比如商品发布: 商品分为会员商品和普通商品 ,会员能接受到普通商品的发布和会员商品的发布信息而普通用户只能接受到普通商品的发布信息

rabbitmq java,java-rabbitmq,rabbitmq,java

 

商品发布案例:

ConnectionUtil(连接)

package com.itheima.rabbitmq.simple.direct;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    //交换机名称
    static final String DIRECT_EXCHAGE = "direct_exchange";
    //会员队列名称
    static final String DIRECT_QUEUE_MEMBER = "direct_queue_member";
    //会员路由
    static final String DIRECT_ROUTING_MEMBER = "member";
    //普通队列名称
    static final String DIRECT_QUEUE_COMMON = "direct_queue_common";
    //普通路由
    static final String DIRECT_ROUTING_COMMON = "common";

    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址  如果是本机就是localhost   如果在其他 地方比如:虚拟机中 那么就是ip地址
        connectionFactory.setHost("localhost");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称   就是和你用户绑定的虚拟机
        connectionFactory.setVirtualHost("/");
        //连接用户名
        connectionFactory.setUsername("guest");
        //连接密码
        connectionFactory.setPassword("guest");

        //创建连接
        return connectionFactory.newConnection();
    }

}


生产者

package com.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer { //生产者

    public static void main(String[] args) throws Exception {

        //创建连接
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        /**
         * 创建交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);

        //队列绑定交换机   和指定 Routingkey
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);

        // 发送信息
        String message = "新增了会员商品。路由模式;routing key 为  "+ConnectionUtil.DIRECT_ROUTING_MEMBER ;
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER, null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息
        message = "新增了普通商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_COMMON ;
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON, null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 关闭资源
        channel.close();
        connection.close();
    }
}


会员消费者

package com.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {//会员消费者

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);

        //队列绑定交换机 和指定 Routingkey
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);//会员Routing
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, consumer);
    }
}


普通消费者

package com.rabbitmq;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {//普通消费者

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);

        //队列绑定交换机   和指定 Routingkey
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_COMMON , true, consumer);
    }
}


先运行所有消费者然后在运行生产者 控制台结果:

会员

rabbitmq java,java-rabbitmq,rabbitmq,java

 

普通

rabbitmq java,java-rabbitmq,rabbitmq,java

 

大家可能会发现 在Producer类中和Consumer以及Consumer1 中都存在重复创建队列和交换机代码

channel.exchangeDeclare(xxx) 创建交换机
channel.queueBind(xxx) 创建队列

那么能不能省略呢答案是能 但是这样的话你必须保证发送消息的目标队列必须存在否则消息将丢失,所以还是不要省略为好这样在发送消息之前就将需要的队列创建完成了, 保证了消息不会丢失

Topics(通配符模式)

Topics通配符模式是在RoutingKey路由模式的基础上升级的 增加了通配符功能 ,在RoutingKey路由模式中我们要分流发送信息,需要指定每一个队列的RoutingKey,如果是100个队列呢? 那不给累死…

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: group.member1

通配符规则:

#:匹配一个或多个词 (最常用)

*:匹配不多不少恰好1个词

举例:

group.#:能够匹配group.xxx 或者 group.xxx.xx

group.*:只能匹配group.xxx

rabbitmq java,java-rabbitmq,rabbitmq,java

 

我们将RoutingKey路由模式的代码改动下:

将ConnectionUtil类中的 路由变量换成以下代码

   //会员路由
    static final String DIRECT_ROUTING_MEMBER = "item.member";
     //普通路由
    static final String DIRECT_ROUTING_COMMON = "item.common";


Consumer

package com.rabbitmq;


import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {//会员消费者

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.TOPIC);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);

        //队列绑定交换机 和指定Routing
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, "item.*");//会员Routing


        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, consumer);
    }
}


Consumer1

package com.rabbitmq;
import com.rabbitmq.client.*;

import java.io.IOException;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Consumer1 {//普通消费者

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        //创建交换机
        channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.TOPIC);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);

        //队列绑定交换机   和指定 Routingkey
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者2-接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_COMMON , true, consumer);
    }
}


Producer

package com.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer { //生产者

    public static void main(String[] args) throws Exception {

        //创建连接
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        /**
         * 创建交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.TOPIC);

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
        channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);

        //队列绑定交换机   和指定 Routingkey
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);
        channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);

        // 发送信息
        String message = "新增了会员商品。路由模式;routing key 为  "+ConnectionUtil.DIRECT_ROUTING_MEMBER ;
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER, null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息
        message = "新增了普通商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_COMMON ;
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON, null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 关闭资源
        channel.close();
        connection.close();
    }
}

消息手动确认

什么是消息确认ACK。如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。

ACK的消息确认机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。

如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。

消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的

false只确认当前一个消息收到,true确认所有consumer获得的消息 ,一般使用false 就行了
channel.basicAck(envelope.getDeliveryTag(),false); //确认消息被消费了
返回false(消息没有被消费成功),消息重新回到队列,(类似数据库的回滚)
channel.basicNack(envelope.getDeliveryTag(), false, true);文章来源地址https://www.toymoban.com/news/detail-773114.html

到了这里,关于RabbitMQ入门案例-Java的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ入门案例之发布订阅模式

    本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。 官网文档地址:https://rabbitmq.com/getstarted.html RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。 在该模

    2024年02月09日
    浏览(57)
  • rabbitmq基础-java-1、快速入门

            AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言

    2024年01月24日
    浏览(41)
  • RabbitMQ-同步和异步通讯、安装和入门案例、SpringAMQP(5个消息发送接收Demo,jackson消息转换器)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月11日
    浏览(32)
  • 15年大牛用140多个实战案例深入讲解Java微服务架构实战:SpringBoot +SpringCloud +Docker +RabbitMQ

    第一部分,springboot篇; 第1章SpringBoot编程起步; 1.SpringBoot提倡的是一种简洁的开发模式,可保证用户不被大量的配置文件和依赖关系所困扰。 2.SpringBoot开发需要Maven或 Gradle构建工具支持。 3.SpringBoot使用一系列的注解来简化开发过程。 第2章SpringBoot程序开发; 1. SpringBoot的依赖

    2024年04月09日
    浏览(51)
  • 【RabbitMQ】RabbitMQ整合SpringBoot案例

    【RabbitMQ】消息队列-RabbitMQ篇章 RabbitMQ实现流程 2.1 实现架构总览 实现步骤: 1:创建生产者工程:sspringboot-rabbitmq-fanout-producer 2:创建消费者工程:springboot-rabbitmq-fanout-consumer 3:引入spring-boot-rabbitmq的依赖 4:进行消息的分发和测试 5:查看和观察web控制台的状况 2.2 具体实现

    2024年02月12日
    浏览(44)
  • 【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例

                                                                       💧 【 R a b b i t M Q 教程】第二章—— R a b b i t M Q − 简单案例 color{#FF1493}{【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例} 【 R abbi tMQ 教程】第二章 —— R abbi tMQ − 简单案例

    2024年02月08日
    浏览(43)
  • rabbitmq的介绍、使用、案例

    rabbitmq简单来说就是个消息中间件,可以让不同的应用程序之间进行异步的通信,通过消息传递来实现解耦和分布式处理。 消息队列:允许将消息发到队列,然后进行取出、处理等操作,使得生产者和消费者之间能够解耦,异步地进行通信。 持久性,可靠性的消息传递机制。

    2024年01月20日
    浏览(54)
  • RabbitMQ【笔记整理+代码案例】

    1.1.1. 什么是 MQ MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年02月15日
    浏览(41)
  • RabbitMQ - 简单案例

    目录 0.引用 1.Hello world 2.轮训分发消息   2.1 抽取工具类   2.2 启动两个工作线程接受消息   2.4 结果展示 3.消息应答   3.1 自动应答   3.2 手动消息应答的方法    3.3 消息自动重新入队   3.4 消息手动应答代码 4.RabbitMQ 持久化   4.1 队列如何实现持久化   4.2 消息实现持久

    2024年02月14日
    浏览(27)
  • RabbitMQ特性介绍和使用案例

    ❤ 作者主页:李奕赫揍小邰的博客 ❀ 个人介绍:大家好,我是李奕赫!( ̄▽ ̄)~* 🍊 记得点赞、收藏、评论⭐️⭐️⭐️ 📣 认真学习!!!🎉🎉   RabbitMQ特性 AMQP (高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),

    2024年02月11日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包