微服务——服务异步通讯RabbitMQ

这篇具有很好参考价值的文章主要介绍了微服务——服务异步通讯RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 前置文章

消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客

消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客

消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客

目录

Work queues 工作队列模式 

案例:

 在生产者端

在消费者端

结果如下

 消费预取限制

 发布订阅模型

 Fanout Exchange(配置文件实现)

案例

消费者代码

生产者代码

 Direct Exchange (注解实现)

 案例

消费者代码

生产者代码

Topic Exchange

案例 

 消费者代码

生产者代码

 消息转换器

生产者代码

JSON方式序列化

生产者代码 (jackson)

 消费者代码(jackson)

 总结


Work queues 工作队列模式 

这里用的不是上面第三篇文章里面的定义配置类的形式。

案例:

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 在生产者端

队列要存在才可以上传。不然代码运行不会报错,但是消息也会不知道发到哪里去。

    @Test
    public void testSendMessage2() throws InterruptedException {
        String queue_Name= "simple.queue";
        String message="hello 鼠鼠";

        for(int i=1;i<=50;i++)
        rabbitTemplate.convertAndSend(queue_Name,message+i);
        Thread.sleep(20);
    }

在消费者端

定义了两个消费者监听上面的队列,本来想三个的,但是不知道默认的交换机名字,所以弄了两个。并且根据注解的不同,第一个是可以直接创建一个队列,第二个需要队列已存在才行。

@Component
public class RabbitMQListener {


    //自动创建队列

    @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
    public void ListenerWorkQueue1(Message message) throws InterruptedException {
        System.out.println("11111"+message.getBody());
        Thread.sleep(20);
    }

    //需要在rabbit_mq上手动创建队列,不然会报错
    @RabbitListener(queues="simple.queue")
    public void ListenerWorkQueue2(Message message) throws InterruptedException {
        System.out.println("22222"+message.getBody());
        Thread.sleep(200);
    }

    //3. 自动创建队列,Exchange 与 Queue绑定
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue("simple.queue"),
//            exchange = @Exchange("/")  //绑定默认交换机
//    ))
//    public void ListenerWorkQueue3(Message message) throws InterruptedException {
//        System.out.println("33333"+message.getBody());
//        Thread.sleep(200);
//    }
}

结果如下

两个队列轮流取消息导致反而变慢了。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 消费预取限制

要指定队列才有效果。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 这里就相当于指定了在simple前缀的队列上每次只能获取一条消息。

运行结果如下,大多数都交给了快的队列执行。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 发布订阅模型

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 Fanout Exchange(配置文件实现)

消息路由到每个绑定的消息队列。

案例

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

消费者代码

 spring读取到这个Bean之后就会向RabbitMq发请求,创建交换机,绑定队列了。 

@Configuration
public class FanoutConfig {
    //itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fannout.queue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //fanout.queue1
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fannout.queue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

定义两个监听用的方法

@Component
public class RabbitMQListener {
//    @RabbitListener(queues="boot_queue")
//    public void ListenerQueue(Message message){
//        System.out.println(message);
//    }


    //自动创建队列

//    @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
//    public void ListenerWorkQueue1(Message message) throws InterruptedException {
//        System.out.println("11111"+message.getBody()+ LocalDateTime.now());
//        Thread.sleep(20);
//    }
//
//    //需要在rabbit_mq上手动创建队列,不然会报错
//    @RabbitListener(queues="simple.queue")
//    public void ListenerWorkQueue2(Message message) throws InterruptedException {
//        System.out.println("22222"+message.getBody()+ LocalDateTime.now());
//        Thread.sleep(200);
//    }

    //3. 自动创建队列,Exchange 与 Queue绑定
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue("simple.queue"),
//            exchange = @Exchange("/")  //绑定默认交换机
//    ))
//    public void ListenerWorkQueue3(Message message) throws InterruptedException {
//        System.out.println("33333"+message.getBody());
//        Thread.sleep(200);
//    }



    @RabbitListener(queuesToDeclare=@Queue("fanout.queue1"))
    public void ListenerFanoutQueue1(Message message) throws InterruptedException {
        System.out.println("11111"+message.getBody());
    }

    @RabbitListener(queuesToDeclare=@Queue("fanout.queue2"))
    public void ListenerFanoutQueue2(Message message) throws InterruptedException {
        System.out.println("22222"+message.getBody());
    }
}

生产者代码

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

//    @Test
//    public void testSend(){
//        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.haha","hello 鼠鼠");
//    }

//    @Test
//    public void testSendMessage2() throws InterruptedException {
//        String queue_Name= "simple.queue";
//        String message="hello 鼠鼠";
//
//        for(int i=1;i<=50;i++)
//        rabbitTemplate.convertAndSend(queue_Name,message+i);
//        Thread.sleep(20);
//    }

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName="itcast.fanout";
        //消息
        String message="hello 鼠鼠";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

 Direct Exchange (注解实现)

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 案例

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

消费者代码


@Component
public class RabbitMQListener {   
     @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),
            key={"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),
            key={"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到:"+msg);
    }
}

生产者代码

    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName="itcast.direct";
        //消息
        String message="hello 鼠鼠";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }

此条代码只有绑定了blue这个key的队列才可以收到。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

换成red就是两个队列都可以收到了。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

Topic Exchange

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

案例 

 微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 消费者代码

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="topic.queue1"),
            exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),
            key="japan.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="topic.queue2"),
            exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),
            key="#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到:"+msg);
    }

生产者代码

    @Test
    public void testSendTopicExchange(){
        //交换机名称
        String exchangeName="itcast.topic";
        //消息
        String message="北岭山脚鼠鼠横死街头,究竟是人性的沦丧还是道德的....";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"japan.news",message);
    }

两个都符合,所以都能收到。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 消息转换器

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

定义一个队列

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }

生产者代码

    @Test
    public void testSendObjectQueue(){
        //消息
        Map<String,Object> msg=new HashMap<>();
        msg.put("name","北岭山脚鼠鼠");
        msg.put("age","22");
        //发送消息
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

可以看见消息被转换成了一长串字符,content_type写着java的序列化。

效率差,安全性也差。 

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

JSON方式序列化

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

声明好MessageConveter之后就可以自动覆盖默认序列化方式了。

导入一个核心依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

生产者代码 (jackson)

 修改生产者的启动类代码,加上一个Bean

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

启动测试类之后可以看见新的消息出现了。

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 消费者代码(jackson)

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 然后可以正常接受到消息

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

如果消费者不使用对应jackson解析的话,代码会报错

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

 总结

微服务——服务异步通讯RabbitMQ,微服务,微服务,架构,云原生

推荐使用jackson的方式 文章来源地址https://www.toymoban.com/news/detail-608759.html

到了这里,关于微服务——服务异步通讯RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 服务异步通讯之 SpringAMQP【微服务】

    1. 同步通讯 同步和异步的区别: ① 同步通讯类似于打电话,是一对一的同时发生的通讯,因此它的时效性更好(一步走完才能走下一步); ② 异步通讯类似于发短信,给一个人发的同时还可以给别人发,支持多线操作,但是由于通讯不同步,所以它的时效性差(多步可以

    2024年01月23日
    浏览(21)
  • 服务异步通讯——springcloud

    官网https://www.rabbitmq.com/ 单机部署 我们在Centos7虚拟机中使用Docker来安装。 1.1.下载镜像 方式一:在线拉取 方式二:从本地加载 在课前资料已经提供了镜像包: 上传到虚拟机中后,使用命令加载镜像即可: 安装MQ 执行下面的命令来运行MQ容器: https://spring.io/projects/spring-amq

    2024年01月22日
    浏览(24)
  • RabbitMQ-同步和异步通讯、安装和入门案例、SpringAMQP(5个消息发送接收Demo,jackson消息转换器)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月11日
    浏览(23)
  • 【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年03月15日
    浏览(42)
  • 服务异步通信-高级篇(RabbitMQ)

    每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 RabbitMQ分别给出了解决方案: 生产者发送确认机制 mq持久化 消费者消费确认机制 失败重

    2024年02月02日
    浏览(31)
  • RabbitMQ服务异步通信-高级篇

    提出问题:消息投递过程中,生产者—— MQ —— 消费者 中间会出现消息丢失问题,导致信息没有及时同步 先梳理一下流程 生产者生产个消息 —— 建立连接——通道传递进mq交换机——交换机传给队列——消费者拉取数据消费 1.生产者生产完消息,相当于写好代码,写错了

    2024年03月24日
    浏览(28)
  • 服务器的异步通信——RabbitMQ

    目录 一、同步通信 VS 异步通信 二、MQ——消息队列 RabbitMQ  RabbitMQ安装  RabbitMQ的整体架构 常见消息模型   基本消息队列(BasicQueue) 工作消息队列(WorkQueue)  发布、订阅(Publish、Subscribe)  Fanout Exchange Direct Exchange  Topic Exchange  SpringAMQP-消息转换器  同步通信 :双方在

    2024年01月24日
    浏览(22)
  • 【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合!

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年03月10日
    浏览(39)
  • 【消息中间件】原生PHP对接Uni H5、APP、微信小程序实时通讯消息服务

    【uniapp】实现买定离手小游戏 Mqtt不同环境问题太多,新手可以看下 《【MQTT】Esp32数据上传采集:最新mqtt插件(支持掉线、真机调试错误等问题》 《一篇就够:uniapp-Mqtt系列问题详细攻略(解决掉线、真机调试错误等问题)》 《解决微信小程序MQTT真机连接问题与合法域名配置

    2024年02月14日
    浏览(30)
  • ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

    目录 一、数据同步 1.1、什么是数据同步 1.2、解决数据同步面临的问题 1.3、解决办法 1.3.1、同步调用 1.3.2、异步通知(推荐) 1.3.3、监听 binlog 1.3、基于 RabbitMQ 实现数据同步 1.3.1、需求 1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听 1.3.3、在“酒店

    2024年02月08日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包