SpringBoot整合实现RabbitMQ

这篇具有很好参考价值的文章主要介绍了SpringBoot整合实现RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot整合实现RabbitMQ

本文大纲
一.RabbitMQ介绍

二.RabbitMQ的工作原理
2.1 RabbitMQ的基本结构
2.2 组成部分说明
2.3 生产者发送消息流程
2.4 消费者接收消息流程

三.SpringBoot 整合实现RabbitMQ
3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
3.1.2 配置application.yml
3.1.3 配置RabbitMQ常量类
3.1.4 创建RabbitMQConfig配置类
3.1.5 创建生产者用于发送消息
3.1.6 创建一个类,用于模拟测试
3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
3.2.2 配置application.yml
3.2.3 配置RabbitMQ常量类
3.2.4 创建RabbitMQConfig配置类
3.2.5 创建消费者消息监听
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息

一.RabbitMQ介绍

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

二.RabbitMQ的工作原理

2.1 RabbitMQ的基本结构:

springboot 整合rabbitmq,SpringBoot,java-rabbitmq,rabbitmq,spring boot

2.2 组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
2.3 生产者发送消息流程:
1.生产者和Broker建立TCP连接。
2.生产者和Broker建立通道。
3.生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4.Exchange将消息转发到指定的Queue(队列)
2.4 消费者接收消息流程:
1.消费者和Broker建立TCP连接 
2.消费者和Broker建立通道
3.消费者监听指定的Queue(队列) 
4.当有消息到达Queue时Broker默认将消息推送给消费者。

三.SpringBoot 整合实现RabbitMQ

创建2个springboot项目,一个 mq-rabbitmq-producer(生产者),一个mq-rabbitmq-consumer(消费者)。

3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
<!--添加AMQP的启动器-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.1.2 配置application.yml
server:
  port: 8080
spring:
  application:
    name: mq-rabbitmq-producer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    #注意:guest用户只能链接本地服务器 比如localhost  不可以连接远程服务器
    username: guest
    password: guest
    #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
    virtual-host: /
    #支持发布返回
    publisher-returns: true
    listener:
      # Routing 路由模型(交换机类型:direct)
      direct:
        #消息确认:手动签收
        acknowledge-mode: manual
        #当前监听容器数
        concurrency: 1
        #最大数
        max-concurrency: 10
        #是否支持重试
        retry:
          enabled: true
          #重试次数5,超过5次抛出异常
          max-attempts: 5
          #重试间隔 3s
          max-interval: 3000

采用 Routing 路由模型(交换机类型:direct)方式,实现RabbitMQ消息队列。

3.1.3 配置RabbitMQ常量类

配置直连交换机名称、消息队列名称、routingkey

package com.example.mqrabbitmqproducer.util.rabbitmq;

/**
 * RabbitMQ RoutingKey 常量工具类
 * @author qzz
 */
public class RabbitMQConstantUtil {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE = "directExchange";

    /**
     * 取消订单 队列名称 routingkey
     */
    public static final String CANCEL_ORDER = "cancel-order";

    /**
     * 自动确认订单 队列名称\routingkey
     */
    public static final String CONFIRM_ORDER = "confirm-order";


}

注意:这里把消息队列名称和routingkey设置为同名。

3.1.4 创建RabbitMQConfig配置类

rabbitmq配置类:配置Exchange、Queue、以及绑定交换机

package com.example.mqrabbitmqproducer.util.rabbitmq.config;

import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
 * @author qzz
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
         /**
         * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
         * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
         */
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public AmqpTemplate amqpTemplate(){
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        /**
         * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
         * config : 需要开启rabbitmq发送失败回退 
         * yml配置publisher-returns: true 
         * 或rabbitTemplate.setMandatory(true);设置为true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
            byte[] message = returnedMessage.getMessage().getBody();
            Integer replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();

            log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                    new String(message),messageId,replyCode,replyText,exchange,routingKey);

        });
        return rabbitTemplate;
    }

    /**
     * 声明直连交换机  支持持久化
     * @return
     */
    @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 取消订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CANCEL_ORDER)
    public Queue cancelOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
    }

    /**
     * 把取消订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
                                      @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
    }

    /**
     * 自动确认订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
    public Queue confirmOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
    }

    /**
     * 把自动确认订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
                                       @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
    }
}
3.1.5 创建生产者用于发送消息
package com.example.mqrabbitmqproducer.util.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * Routing 路由模型(交换机类型:direct)
 * 消息生成者
 * @author qzz
 */
@Component
public class DirectSender {

    private static final Logger log = LoggerFactory.getLogger(DirectSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param routingKey
     * @param msg
     */
    public void send (String routingKey,String msg){
        Message message = MessageBuilder.withBody(msg.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID()+"").build();

        log.info("【发送者】消息内容【{}】 交换机【{}】 路由【{}】 消息ID【{}】",msg,RabbitMQConstantUtil.DIRECT_EXCHANGE
                ,routingKey,message.getMessageProperties().getMessageId());
        rabbitTemplate.convertAndSend(RabbitMQConstantUtil.DIRECT_EXCHANGE,routingKey,message);
    }
}
3.1.6 创建一个类,用于模拟测试
package com.example.mqrabbitmqproducer.controller;

import com.alibaba.fastjson.JSONObject;
import com.example.mqrabbitmqproducer.util.rabbitmq.DirectSender;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * 模拟测试消息发送
 * @author qzz
 */
@RestController
@RequestMapping("/order")
public class TestRabbitMQSendMsg {

    /**
     * rabbitMQ消息发送
     */
    @Autowired
    private DirectSender directSender;

    /**
     * 测试取消订单,发送消息
     */
    @GetMapping("/cancel")
    public void cancel(){

        //取消订单逻辑省略

        //取消订单,发送消息
        Map<String, Object> map = new HashMap<>();
        map.put("order_number","4364756867987989");
        map.put("product_id","1");
        directSender.send(RabbitMQConstantUtil.CANCEL_ORDER, JSONObject.toJSONString(map));
    }

    /**
     * 测试自动确认订单,发送消息
     */
    @GetMapping("/confirm")
    public void confirm(){

        //自动确认订单,发送消息
        String order_number="4364756867987989";
        directSender.send(RabbitMQConstantUtil.CONFIRM_ORDER, order_number);
    }
}

启动项目,进行测试:
(1)在postman中输入 http://localhost:8080/order/cancel,进行测试:

(2)在postman中输入 http://localhost:8080/order/confirm,进行测试:

3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <!--添加AMQP的启动器-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
3.2.2 配置application.yml
server:
  port: 8083
spring:
  application:
    name: mq-rabbitmq-consumer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    #注意:guest用户只能链接本地服务器 比如localhost  不可以连接远程服务器
    username: guest
    password: guest
    #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
    virtual-host: /
3.2.3 配置RabbitMQ常量类

配置直连交换机名称、消息队列名称、routingkey

package com.example.mqrabbitmqconsumer.util.rabbitmq;

/**
 * RabbitMQ RoutingKey 常量工具类
 * @author qzz
 */
public class RabbitMQConstantUtil {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE = "directExchange";

    /**
     * 取消订单 队列名称 \routingkey
     */
    public static final String CANCEL_ORDER = "cancel-order";

    /**
     * 自动确认订单 队列名称\routingkey
     */
    public static final String CONFIRM_ORDER = "confirm-order";


}

注意:这里把消息队列名称和routingkey设置为同名。

3.2.4 创建RabbitMQConfig配置类

rabbitmq配置类:配置Exchange、Queue、以及绑定交换机

package com.example.mqrabbitmqconsumer.util.rabbitmq.config;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
 * @author qzz
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
       //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        /**
         * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
         * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
         */
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public AmqpTemplate amqpTemplate(){
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        /**
         * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
         * config : 需要开启rabbitmq发送失败回退     
         * yml配置publisher-returns: true    
         * 或rabbitTemplate.setMandatory(true);设置为true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
            byte[] message = returnedMessage.getMessage().getBody();
            Integer replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();

            log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                    new String(message),messageId,replyCode,replyText,exchange,routingKey);

        });
        return rabbitTemplate;
    }

    /**
     * 声明直连交换机  支持持久化
     * @return
     */
    @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 取消订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CANCEL_ORDER)
    public Queue cancelOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
    }

    /**
     * 把取消订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
                                      @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
    }

    /**
     * 自动确认订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
    public Queue confirmOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
    }

    /**
     * 把自动确认订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
                                       @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
    }
}
3.2.5 创建消费者消息监听

(1)监听取消订单

package com.example.mqrabbitmqconsumer.listener;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 监听取消订单
 * @author qzz
 */
@Component
public class RabbitMQCancelOrderListener {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQCancelOrderListener.class);

    /**
     * 接受消息
     * @param channel
     * @param message
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitMQConstantUtil.CANCEL_ORDER)
    public void receiverMsg(Channel channel, Message message) throws Exception {
        //body 即消息体
        String msg = new String(message.getBody());
        String messageId = message.getMessageProperties().getMessageId();
        log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);

        try{
            //如果有业务逻辑,则在这里编写


            //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("消息处理出现异常:{}",e.getMessage());
            //告诉消息服务器 消息处理异常,消息需要重新再次发送!
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        }
    }
}

(2)监听自动确认订单文章来源地址https://www.toymoban.com/news/detail-584118.html

package com.example.mqrabbitmqconsumer.listener;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 监听自动确认订单
 * @author qzz
 */
@Component
public class RabbitMQConfirmOrderListener {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfirmOrderListener.class);

    /**
     * 接受消息
     * @param channel
     * @param message
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitMQConstantUtil.CONFIRM_ORDER)
    public void receiverMsg(Channel channel, Message message) throws Exception {
        //body 即消息体
        String msg = new String(message.getBody());
        String messageId = message.getMessageProperties().getMessageId();
        log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);

        try{
            //如果有业务逻辑,则在这里编写


            //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("消息处理出现异常:{}",e.getMessage());
            //告诉消息服务器 消息处理异常,消息需要重新再次发送!
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        }
    }
}
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息

到了这里,关于SpringBoot整合实现RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(34)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(39)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(35)
  • SpringBoot实战项目整合RabbitMQ+ElaticSearch实现SKU上下架功能_尚上优选整合es+mq实现商品上下架(1)

    文章目录 前言 1、前置条件 2、搭建service-search模块 3、开发功能接口 3.1 添加远程调用方法 3.2、创建远程调用模块 3.3、开发service-search 模块接口 4、RabbitMQ 5、完善SKU管理商品上下架 5.1、商品服务 5.2、es服务 6、最终测试 总结 最终实现效果:针对SKU的上下架 上架效果: 1、后

    2024年04月17日
    浏览(63)
  • RabbitMQ: SpringBoot 整合 RabbitMQ

    重点是这个依赖 通过              和上一个一样  

    2024年02月09日
    浏览(36)
  • 【RabbitMQ】RabbitMQ整合SpringBoot案例

    【RabbitMQ】消息队列-RabbitMQ篇章 RabbitMQ实现流程 2.1 实现架构总览 实现步骤: 1:创建生产者工程:sspringboot-rabbitmq-fanout-producer 2:创建消费者工程:springboot-rabbitmq-fanout-consumer 3:引入spring-boot-rabbitmq的依赖 4:进行消息的分发和测试 5:查看和观察web控制台的状况 2.2 具体实现

    2024年02月12日
    浏览(33)
  • 【RabbitMQ】4 Spring/SpringBoot整合RabbitMQ

    spring-amqp 是对AMQP的一些概念的一些抽象, spring-rabbit 是对RabbitMQ操作的封装实现。 主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等。 RabbitAdmin 类完成对Exchange,Queue,Binding的操作,在容器中管理了 RabbitAdmin 类的时候,可以对Exchange,Queue,Binding进行自

    2024年01月22日
    浏览(28)
  • SpringBoot 整合 RabbitMQ

    由于有的 Idea 不选择插线无法创建 Spring Boot 项目,这里我们先随便选一个插件,大家也可以根据需求选择~~ 把版本改为 2.7.14 引入这两个依赖: 配置 application.yml文件 Config 类 : RabbitMQConfig 测试类: RabbitMQConfigTests 结果 当我们启动 测试类 之后就可以发现我们的 rabbitmq 界面里的

    2024年02月10日
    浏览(27)
  • RabbitMQ整合Springboot

    目录 一、配置 二、使用 (1)创建普通交换机 (2) 创建普通队列 (3)绑定 交换机--队列 (4)创建带有死信交换机的队列 (5)生产者 (6)消费者 (7)Message对象 (8)延时队列优化(死信实现延时,有缺陷) 三、Rabbitmq插件实现延迟队列(重点) 四、发布确认 (1)确认回调

    2024年02月15日
    浏览(29)
  • SpringBoot整合RabbitMQ(基础)

    一.环境准备 1、在pom文件中引入对应的依赖: 2、在application.yml配置文件中配置RabbitMQ: 二、整合 点对点,简单模式 ①配置文件中声明队列 ②创建生产者 消息发送成功后,在web管理页面查看: 可以看到对应队列中产生了消息 ③创建消费者 启动项目,可以看到消息成功消费:

    2024年02月11日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包