Java分布式微服务4——异步服务通讯(RabbitMQ)中间件

这篇具有很好参考价值的文章主要介绍了Java分布式微服务4——异步服务通讯(RabbitMQ)中间件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

微服务的远程异步调用

为什么需要异步调用?
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

  1. 故障隔离:支付服务不负责调用其他三个服务,只负责通知Broker支付成功这个事件,然后就返回结果,后面的服务故障了和前面发布事件的服务无关,前面的服务发布完事件就结束了
  2. 吞吐量提升:Broker将支付成功的事件广播给订阅了这个事件的那些服务,服务们各自并发进行接下来的工作,吞吐量变高,性能提升
  3. 耦合度低:有新服务加入只要让它订阅就行,耦合度低
  4. 流量削峰:broker可以起到缓冲作用,把大量事件存着给后面慢慢处理

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
异步调用的缺点:

  1. 依赖于Broker的可靠性、安全性、吞吐能力
  2. 架构复杂,业务流程不清晰,难以追踪

MQ介绍

MQ(MessageQueue)消息队列,就是上文事件驱动架构中的Broker
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

RabbitMQ

RabbitMQ结构

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

  • VirtualHost: 一般属于不同用户,互相隔离,是对exchange、queue等资源的逻辑分组
  • channel: 操作MQ的工具
  • queue: 缓存消息队列
  • exchange: 路由消息到队列中

RabbitMQ的单机部署

基于Erlang语言设计
RabbitMQ文档
在Centos7虚拟机中使用Docker来安装。

1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

.tar镜像包上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \ # 15672是管理平台的端口
 -p 5672:5672 \ # 5672消息通信的端口
 -d \
 rabbitmq:3-management

在浏览器使用虚拟机地址:15672就能看到管理平台
ps.一般情况下每个用户要独享一个虚拟主机
管理后台介绍

RabbitMQ入门

官方入门示例

常见消息模型

  1. 基本消息队列basic queue
  2. 工作消息队列work queue
  3. 发布订阅:广播fanout
  4. 发布订阅:路由direct
  5. 发布订阅:主题topic

SpringAMQP

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

  • 监听器容器,异步处理入栈消息
  • 发送和接收消息的RabbitTemplate
  • RabbitAdmin用于自动声明队列,交换和绑定

exchange、queue这种东西,如果没有提前创建好,在使用的时候也会自动创建

SpringAMQP实现基础消息队列

  1. 父工程引入spring-amqp依赖
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. 在publisher服务中利用RabbitTemplate发送消息到simple.queue队列
    在publisher的application.yml中添加mq的连接信息
spring:
  rabbitmq:
    host: 192.168.36.128 # 主机名
    port: 5672
    virtual-host: / # 虚拟主机
    username: itcast
    password: 123456

在测试类中编写一个测试方法,注入RabbitTemplate对象
别忘了加注解让spring boot启动,要不然没东西注入报空指针

//@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue(){
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

运行测试方法,在管理平台中就能看到队列中有一个消息了

  1. 在consumer服务中编写消费逻辑,绑定simple.queue队列

同样,添加依赖,然后在配置文件中添加AMQP信息

消费者只需要新建一个类(为了被Springboot找到并内部注入需要添加@Component),定义一个监听方法(用@RabbitListener修饰)即可:

@Component
public class SpringRabbitListener {
    
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String message){
        System.out.println("spring消费者接收到消息:"+message);
    }
}

启动consumer微服务以后,它会自动监听simple.listener队列中有没有消息,如果有就直接拿过来
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

SpringAMQP实现工作队列

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
绑定两个consumer可以提高消息处理的速度,避免消息堆积

假设publisher共发送了50条消息,那设置两个consumer的监听者:

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:"+msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:"+msg);
        Thread.sleep(10);
    }
}

事实上,50条消息被平均分配给了两个consumer监听器,消费者1接收完25条以后还要慢慢等消费者2接收完它的25条,并不会抢消息
这是消息预取机制造成的问题,两个消费者是在消费前就把消息分配好了

在配置文件中,可以设置消息预取的上限simple.prefetch(默认为无限),设置为1的时候就是一条一条取,以达到能者多劳,总体速度变快的效果。

spring:
  rabbitmq:
    host: 192.168.36.128
    port: 5672
    virtual-host: /
    username: itcast
    password: 123456
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才拿下一条

SpringAMQP实现发布订阅

发布订阅模式与先前案例的区别是,允许同一消息被群发给多个消费者,而不是一个消费者消费完就删除。实现方法靠exchange(交换机)

常见的场景也是一个事件的完成会调动很多后续的服务
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
消息发布者现在只需要把消息交给交换机,不需要知道给哪些队列,交换机会帮助转发
交换机三种类型:

  • Fanout广播
  • Direct路由
  • Topic话题

1. Fanout Exchange 广播模式

  1. 在consumer中声明队列、交换机,并且把队列绑定到交换机(下面还有使用@RabbitListener注解声明和绑定的方法)
@Configuration
public class FanoutConfig {
    @Bean // 声明交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    @Bean // 声明队列
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean // 绑定
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }//以相同方式声明第2个队列并绑定
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

这些Bean会被springboot自动装配,被AMQP使用
2. 在consumer中编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueueMessage1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:"+msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueueMessage2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:"+msg);
        Thread.sleep(10);
    }
}
  1. 在publisher中编写测试方法,向itcast.fanout发送消息
@Test
public void testSendFanoutExchange(){
    // 交换机名称
    String exchangeName = "itcast.fanout";
    // 消息
    String msg = "it's a broadcast";
    // 发送
    rabbitTemplate.convertAndSend(exchangeName, "", msg);
}

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

2. Direct Exchange 路由模式

DIrect Exchange会根据规则把消息路由(routes)到指定队列
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
实现思路如下:

  1. 利用@RabbitListener注解声明Exchange、Queue、RoutingKey,之前那种自定义Bean的方式比较繁琐
@Component
public class SpringRabbitListener {
	@RabbitListener(bindings = @QueueBinding(
	    	value = @Queue(name = "direct.queue1"),
	        exchange = @Exchange(name = "itcast.direct", type = "direct"),
	        key = {"red", "blue"}
	))
	public void listenDirectQueue1(String msg){
	    System.out.println("消费者接收到来自direct.queue1的消息:"+msg);
	}
	
	@RabbitListener(bindings = @QueueBinding(
	     	value = @Queue(name = "direct.queue2"),
	        exchange = @Exchange(name = "itcast.direct", type = "direct"),
	        key = {"red", "yellow"}
	))
	public void listenDirectQueue2(String msg){
	    System.out.println("消费者接收到来自direct.queue2的消息:"+msg);
	}
}
  1. 发送消息,携带routingKey
@SpringBootTest
public class PublisherTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendDirectExchange(){
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String msg = "Hello Blue";
        // 发送
        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    }
}

3. Topic Exchange 话题

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
Topic中的BindingKey支持通配符(注意是BindingKey):
#: 0或多个单词
*: 1个单词
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java

实现思路:
在@RabbitListener中

  1. 声明bindingKey的时候使用通配符
  2. Exchange的type指定为"topic"
@RabbitListener(bindings = @QueueBinding(
    	value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic", type = "topic"),
        key = {"china.#"}
))

这样routingKey只要符合bingdingKey的模式,就会把消息分发给它

消息转换器

rabbitTemplate.convertAndSend发送的信息是Object类型的,所以可以传任意对象,会自动序列化
默认使用的序列化方式是java提供的序列化,类会被序列化成字节串,有许多缺点

我们可以采用别的序列化方式,比如JSON序列化方式,把MessageConverter类型的容器中的对象顶掉就行

Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java
记得发送端和接收端的对象类型要一致或者兼容,用的序列化MessageConverter也要一致
Java分布式微服务4——异步服务通讯(RabbitMQ)中间件,微服务,分布式,java-rabbitmq,java文章来源地址https://www.toymoban.com/news/detail-637069.html

到了这里,关于Java分布式微服务4——异步服务通讯(RabbitMQ)中间件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务中间件-分布式缓存Redis

    – 基于Redis集群解决单机Redis存在的问题 单机的Redis存在四大问题: 1.数据丢失问题: Redis是内存存储,服务重启可能会丢失数据 2.并发能力问题: 单节点Redis并发能力虽然不错,但也无法满足如618这样的高并发场景 3.故障恢复问题: 如果Redis宕机,则服务不可用,需要一种自动

    2024年02月12日
    浏览(67)
  • 微服务中间件--分布式搜索ES

    elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。 elasticsearch结合kibana、Logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域。 elasticsearch是elastic stack的核心,负责存储、搜索、分析数据。 正向索引

    2024年02月11日
    浏览(39)
  • 微服务——服务异步通讯RabbitMQ

     前置文章 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客 消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客 消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客 目录 Work queues 工作队列模式  案例

    2024年02月15日
    浏览(63)
  • 【消息中间件】原生PHP对接Uni H5、APP、微信小程序实时通讯消息服务

    【uniapp】实现买定离手小游戏 Mqtt不同环境问题太多,新手可以看下 《【MQTT】Esp32数据上传采集:最新mqtt插件(支持掉线、真机调试错误等问题》 《一篇就够:uniapp-Mqtt系列问题详细攻略(解决掉线、真机调试错误等问题)》 《解决微信小程序MQTT真机连接问题与合法域名配置

    2024年02月14日
    浏览(41)
  • 服务异步通讯——RabbitMQ

    微服务间通讯有同步和异步两种方式: 同步通讯 :就像打电话,需要实时响应。 异步通讯 :就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是一个人却不能跟多个人同时通话。而发送邮件可以同时与多个人收发邮件,但是往往响应会有延

    2024年01月18日
    浏览(43)
  • Java分布式微服务3——Docker

    大项目组件多,运行环境复杂 每个组件需要的依赖和函数库可能版本不同,容易不兼容 开发、测试、生产环境有差异,甚至操作系统都不一样 Docker可以在任何Linux机器上一键部署,一键移除 Docker与虚拟机区别 Docker直接用打包的系统函数库调用OS内核,性能较好,硬盘占用小

    2024年02月13日
    浏览(43)
  • SpringCloud学习路线(9)——服务异步通讯RabbitMQ

    一、初见MQ (一)什么是MQ? MQ(MessageQueue) ,意思是 消息队列 ,也就是事件驱动架构中的Broker。 (二)同步调用 1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。

    2024年02月15日
    浏览(39)
  • 《分布式中间件技术实战:Java版》学习笔记(一):抢红包

    数据库建表 (1)red_send_record 记录用户发送了若干总金额的若干个红包。 (2)red_detail 记录用户发送的红包被分成的小红包金额。 (3)red_rob_record 记录用户抢到的红包金额。 随机生成红包金额 红包金额的最小单位是分,将红包金额放大100倍到int类型(为了方便生成随机数),保证

    2024年02月10日
    浏览(53)
  • 消息中间件学习笔记--RabbitMQ(二、模式,一次违反常规的Java大厂面试经历

    .Fanout:转发消息到所有绑定队列 比较常用的是Direct、Topic、Fanout. Fanout 这种Fanout模式不处理路由键,只·需要简单的将队列绑定到exchange上,一个发送到exchange的消息都会被转发到与该exchange绑定的所有队列上。很像广播子网,每台子网内的主机都获得了一份复制的消息。Fan

    2024年04月09日
    浏览(99)
  • 【Java程序员面试专栏 分布式中间件】Redis 核心面试指引

    关于Redis部分的核心知识进行一网打尽,包括Redis的基本概念,基本架构,工作流程,存储机制等,通过一篇文章串联面试重点,并且帮助加强日常基础知识的理解,全局思维导图如下所示 明确redis的特性、应用场景和数据结构 Redis是一个 开源的、内存中的数据结构存储系统

    2024年02月20日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包