【谷粒商城之消息队列RabbitMQ】

这篇具有很好参考价值的文章主要介绍了【谷粒商城之消息队列RabbitMQ】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本笔记内容为尚硅谷谷粒商城消息队列RabbitMQ部分

目录

一、概述

二、简介

三、Docker安装RabbitMQ

四、Springboot整合RabbitMQ 

1、引入spring-boot-starter-amqp

2、application.yml配置

3、测试RabbitMQ

1. AmqpAdmin-管理组件

2.RabbitTemplate-消息发送处理组件

3.RabbitListener&RabbitHandle接收消息

4.RabbitMQ消息确认机制-可靠抵达

5.可靠抵达-Ack消息确认机制


一、概述


【谷粒商城之消息队列RabbitMQ】

【谷粒商城之消息队列RabbitMQ】【谷粒商城之消息队列RabbitMQ】

【谷粒商城之消息队列RabbitMQ】

二、简介


【谷粒商城之消息队列RabbitMQ】

【谷粒商城之消息队列RabbitMQ】

【谷粒商城之消息队列RabbitMQ】

三、Docker安装RabbitMQ


docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

  • 4369, 25672 (Erlang发现&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后台端口)
  • 61613, 61614 (STOMP协议端口)
  • 1883, 8883 (MQTT协议端口) 

Networking and RabbitMQ — RabbitMQ

四、Springboot整合RabbitMQ 


1、引入spring-boot-starter-amqp

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yml配置

spring.rabbitmq.host=192.168.88.130
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3、测试RabbitMQ

1. AmqpAdmin-管理组件

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 1、如何创建Exchange[hello.java.exchange]、Queue、Binding
     *      1)、使用AmqpAdmin进行创建
     */
    // 创建交换机
    @Test
    void createExchange() {
        //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        //durable:是否持久化
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("exchange:[{}]创建成功","hello-java-exchange");
    }

    // 创建队列
    @Test
    void createQueue() {
        //exclusive:是否排他的,false:允许同时有多个连接到此queue
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("queue:[{}]创建成功","hello-java-queue");
    }

    @Test
    void createBinding(){
        // 创建绑定
        // String destination【目的地】,
        // DestinationType destinationType 【目的地类型】
        // String exchange【交换机】,
        // String routingKey【路由键】,
        // Map<String, Object> arguments【自定义参数】
        // 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("binding:[{}]创建成功","hello-java-binding");
    }

2.RabbitTemplate-消息发送处理组件

    @Test
    void sendMessageTests() {

        //1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
        String msg = "Hello World";
        OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
        entity.setId(1L);
        entity.setCreateTime(new Date());

        //2、发送的对象类型的消息,可以是一个json
        for (int i = 0;i<10;i++) {
            if(i%2==0) {
                entity.setName("Vc" + i);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity);
            }else {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
            }
            log.info("消息发送完成{}" + entity);
        }
    }

使用json格式的序列化器

    //使用json格式的序列化器
    //否则使用jdk的序列化器
    @Configuration
public class MyRabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

3.RabbitListener&RabbitHandle接收消息

    1)必须使用@EnableRabbit
    2)监听方法必须放在@Component中
    3)@RabbitListener(queues={"hello-java-queue"})放在类上
         @RabbitHandler:标在方法上【作用:重载处理不同类型的数据】
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService 
    /**
     * queue:声明需要监听的所有队列
     *
     * org.springframework.amqp.core.Message
     *
     * 参考可以写一下类型
     * 1、Message message:原生消息详细信息。头+体
     * 2、T<发送的消息的类型> OrderReturnReasonEntity content
     * 3、Channel channel: 当前传输数据的通道
     *
     * Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
     * 场景:
     *      1)、订单服务启动多个;同一个消息,只能有一个客户端收到
     *      2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
     * @param message
     */
    //RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnReasonEntity content,
                               Channel channel) throws InterruptedException {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getName());
    }

    @RabbitHandler
    public void recieveMessage2(OrderEntity content) {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
    }

}

4.RabbitMQ消息确认机制-可靠抵达

【谷粒商城之消息队列RabbitMQ】

【谷粒商城之消息队列RabbitMQ】

注意:springboot.rabbitmq.publisher-confirm 已被弃用.

#新版使用
#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated

【谷粒商城之消息队列RabbitMQ】

#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个return
spring.rabbitmq.template.mandatory=true

示例代码: 

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

 

5.可靠抵达-Ack消息确认机制

【谷粒商城之消息队列RabbitMQ】

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

示例代码: 

    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnReasonEntity content,
                               Channel channel) throws InterruptedException {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getName());
        //Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        //签收货物,非批量模式
        try {

            if(deliveryTag%2==0){
                //收货
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物..."+deliveryTag);
            }else {
                //退货 requeue=false 丢弃 requeue=true 发回服务器,服务器重新入队
                //deliveryTag, multiple, requeue(false不再入队)
                channel.basicNack(deliveryTag,false,false);
                System.out.println("没有签收了货物..."+deliveryTag);
            }
        } catch (Exception e) {
            //网络中断

        }
    }

结束!文章来源地址https://www.toymoban.com/news/detail-434213.html

到了这里,关于【谷粒商城之消息队列RabbitMQ】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(82)
  • 【SpringBoot笔记29】SpringBoot集成RabbitMQ消息队列

    这篇文章,主要介绍SpringBoot如何集成RabbitMQ消息队列。 目录 一、集成RabbitMQ 1.1、引入amqp依赖 1.2、添加连接信息 1.3、添加RabbitMQ配置类

    2023年04月08日
    浏览(57)
  • 【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

    这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。 目录 一、防止消息丢失 1.1、消息确认机制(生产者) (1)生产者丢失消息 (2)生产者消息确认机制 1.2、消息确认机制(消费者) (1)消费者丢失消息

    2024年02月02日
    浏览(51)
  • 【笔记/后端】谷粒商城高级篇

    全文检索工具:快速储存、搜索和分析海量数据。 Index (索引) → Mysql的库 动词,相当于MySQL中的insert; 名词,相当于MySQL中的Database。 Type (类型) → Mysql的表(过时) 在Index中,可以定义一个或多个类型。类似于MySQL中的Table;每一种类型的数据放在一起。 Document (文档) →

    2024年02月02日
    浏览(43)
  • 谷粒商城学习笔记

    1)、电商模式 2)、谷粒商城 谷粒商城是一个 B2C 模式的电商平台,销售自营商品给客户。 1、项目微服务架构图 2、微服务划分图 3、项目技术特色 4、项目前置要求 分布式是指将不同的业务分布在不同的地方。 集群指的是将几台服务器集中在一起,实现同一业务。 例如:

    2024年01月16日
    浏览(59)
  • 【笔记】谷粒商城高级篇

    全文检索工具:快速储存、搜索和分析海量数据。 Index (索引) → Mysql的库 动词,相当于MySQL中的insert; 名词,相当于MySQL中的Database。 Type (类型) → Mysql的表(过时) 在Index中,可以定义一个或多个类型。类似于MySQL中的Table;每一种类型的数据放在一起。 Document (文档) →

    2023年04月21日
    浏览(46)
  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(54)
  • 谷粒商城笔记+踩坑(1)——架构、项目环境搭建、代码生成器

     导航: 谷粒商城笔记+踩坑汇总篇_谷粒商城笔记踩坑6_vincewm的博客-CSDN博客 目录 1、项目介绍 1.1 微服务架构图 1.2. 微服务划分图 2、项目环境搭建 2.1. 虚拟机搭建环境 2.2. Linux安装docker、配置镜像加速 2.3. docker配置mysql、设置自启动 2.3.1、安装mysql5.7 2.3.2、修改mysql配置文件

    2024年02月01日
    浏览(86)
  • 谷粒商城笔记(详细版) IDEA2021 高级篇1(2022/11/9)

    谷粒商城笔记(详细版) IDEA2021 基础篇(1/3). 谷粒商城笔记(详细版) IDEA2021 基础篇(2/3). ES6 VUE 基础篇前端笔记 谷粒商城笔记(详细版) IDEA2021 基础篇(3/3). [谷粒商城笔记(详细版) IDEA2021 高级篇(第1篇) 索引对应mysql中的库 类型对应Mysql中的数据表 文档对应mysql中的一条条数据 在我们

    2023年04月09日
    浏览(40)
  • 谷粒商城笔记+踩坑(9)——上架商品spu到ES索引库

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1、ES回顾 2、ES整合商品上架  2.1、分析 2.2、创建sku的es索引库 2.2.1、两种索引库设计方案分析 2.2.2、最终选用的索引库方案,nested类型 2.3、SkuEsModel模型类 2.4、【库存模块】库存量查询 2.5、【查询模块】保存ES文档 2.5.1、常量类 2.5.2、

    2024年02月05日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包