AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。
在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度。
编写direct消息发送类
1.在rabbitmq-product-java项目中创建,com.bjpowernode.rabbitmq.direct.Send类
package com.it.rabbitmq.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//配置rabbitMQ的连接信息
factory.setHost("192.168.174.129");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123456");
//定义连接
Connection connection=null;
//定义通道
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("myDirectQueue",true,false,false,null);
/**
* 声明一个交换机
* 参数1为交换机的名称取值任意
*参数2为交换机的类型 取值为direct fanout topic headers
* 参数3为是否为持久化交换机
* 注意:
* 1.声明交换机时如果这个交换机已经存在则放弃声明,如果交换机不存在,则声明交换机
* 2.这个代码时可有可无的,但是在使用前要确保交换机被声明
*/
channel.exchangeDeclare("directExchange","direct",true);
/**
* 绑定交换机
* 参数1为队列名称
* 参数2位交换机名称
* 参数3为消息的RoutingKey(就是BindingKey)
* 注意:
* 1、在进行队列和交换机绑定时,必须要确保交换机和队列已经声明成功
*/
channel.queueBind("myDirectQueue","directExchange","directRoutingKey");
String message="direct的测试消息!";
/**
* 发送消息到队列
* 参数1:交换机名称
* 参数2:消息的RoutingKey,如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致
* 那么这个消息就会发送到指定的队列中
* 注意:
* 发送消息时,必须确保交换机已经创建,并且确保已经正确的绑定到了某个队列
*/
channel.basicPublish("","",null,message.getBytes("utf-8"));
System.out.println("消息发送成功,direct");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if (channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2.运行代码
3.进入rabbitMQ控制台查看交换机
点击进入该交换机,点击Bindings,查看交换机的绑定信息
4.进入控制台查看队列
点击该队列进入,查看该队列的绑定信息。
注意:使用direct消息模式时必须要指定RoutingKey(路由键),将指定的消息绑定到指定的路由键上
编写direct消息接收类
1.在rabbitmq-consumer-java项目中创建,com.bjpowernode.rabbitmq.direct.Receive类
package com.it.rabbitmq.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive {
public static void main(String[] args) {
//创建链接工厂对象
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("123456");
factory.setHost("192.168.174.129");
factory.setPort(5672);
Connection connection=null;//定义链接对象
Channel channel=null;//定义通道对象
try {
connection=factory.newConnection();//实例化链接对象
channel=connection.createChannel();//实例化通道对象
//指定Exchange的类型
//参数1为 交换机名称
//参数2为交换机类型取值为 direct、queue、topic、headers
//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化
channel.queueDeclare("myDirectQueue", true, false, false, null);
channel.exchangeDeclare("directExchange", "direct", true);
channel.queueBind("myDirectQueue","directExchange","directRoutingKey");
channel.basicConsume("myDirectQueue ",true, "",new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//获取消息数据
String bodyStr = new String(body);
System.out.println("消费者 ---"+bodyStr);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
2.运行代码
注意:
1、使用Exchange的direct模式时接收者的RoutingKey必须要与发送时的RoutingKey完全一致否则无法获取消息文章来源:https://www.toymoban.com/news/detail-612080.html
2、接收消息时队列名也必须要发送消息时的完全一致文章来源地址https://www.toymoban.com/news/detail-612080.html
到了这里,关于rabbitMQ:绑定Exchange发送和接收消息(direct)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!