MQ
同步通讯
- 优点:
- 时效性强, 可以立即得到结果
- 缺点:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步通讯
- 优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
- 缺点:
- 依赖于Broker的可靠性, 安全性, 吞吐能力
- 架构复杂, 业务没有明显的流程线, 不好追踪管理
mq常见技术
RabbitMq | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | java | java | Scala&java |
协议支持 | AMQP, XMPP, SMTP, STOMP | OpenWire, STOMP, REST, XMPP, AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ
下载安装
- docker下载
docker pull rabbitmq:3-management
- docker运行
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
介绍
- channel: 操作MQ的工具
- exchange: 路由消息到队列中
- queue: 缓存消息
- virtual host: 虚拟主机, 是对queue, exchange等资源的逻辑分组
SimpleQueue模型
- publisher
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.174.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
- consumer
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.174.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
SpringAMQP
介绍
- AMQP: 应用间消息通信的一种协议, 与语言和平台无关.
- 依赖:
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
SimpleQueue模型
- 配置
spring: rabbitmq: host: 192.168.174.133 # 主机名S port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
- publisher
@SpringBootTest @RunWith(SpringRunner.class) public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
- comsumer
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消费者接收到消息: ["+msg+"]"); } }
WorkQueue模型
- publisher
@Test public void testWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, spring amqp!"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message); Thread.sleep(20); } }
- consumer
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(200); }
- 配置
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取下一消息, 处理完成才能获取下一个消息
- work模型总结:
- 多个消费者绑定到一个队列, 同一条消息只会被一个消费者处理
- consumer会预取消息, 会导致性能差的consumer堆积消息, 可以通过设置prefetch来控制消费者预取的消息数量.
发布订阅模型
介绍
- 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者, 实现方式就是加入了exchange(交换机).
- 常见exchange类型:
- Fanout: 广播
- Direct: 路由
- Topic: 话题
FanoutExchange
- config
@Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
- consumer
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(200); }
- publisher
@Test public void testSendFanoutExchange() { // 交换机名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, every one!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
- 总结:
- 交换机的作用
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息, 路由失败, 消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
- 声明队列的Bean: Queue
- 声明交换机的Bean: FanoutExchange
- 声明绑定关系的Bean: Binding
- 交换机的作用
Direct Exchange
- 介绍
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时, 指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
- consumer
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息: ["+msg+"]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息: ["+msg+"]"); }
- publisher
@Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "itcast.direct"; // 消息 String message = "hello, every blue!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); }
Topic Exchange
- consumer
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息: ["+msg+"]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息: ["+msg+"]"); }
- publisher
@Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "itcast.topic"; // 消息 String message = "hello, china.news"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
- 总结
- TopicExchange与DirectExchange类似, 区别在于routingKey必须是多个单词的列表, 并且以
.
分割.
- TopicExchange与DirectExchange类似, 区别在于routingKey必须是多个单词的列表, 并且以
消息转换器
- 依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
- MessageConverter
@Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); }
- publisher
@Test public void testSendObjectQueue() { Map<String, Object> msg = new HashMap<>(); msg.put("name", "柳岩"); msg.put("age", 21); rabbitTemplate.convertAndSend("object.queue", msg); }
- consumer
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String, Object> msg) { System.out.println("接收到object.queue的消息: "+msg); }
来源
黑马程序员. SpringCloud微服务文章来源地址https://www.toymoban.com/news/detail-602873.html
文章来源:https://www.toymoban.com/news/detail-602873.html
到了这里,关于JavaWeb_SpringCloud微服务_Day4-MQ, RabbitMQ, SpringAMQP的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!