Spring Boot整合RabbitMQ之发布与订阅模式

这篇具有很好参考价值的文章主要介绍了Spring Boot整合RabbitMQ之发布与订阅模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现:

  1. 用户发布动态,其“粉丝”收到其发布动态的消息
  2. 用户下订单,库存模块、支付模块等收到消息并处理
  3. 等等

1. 创建RabbitMQ的生产者

创建一个springboot项目,项目创建idea的默认创建springboot项目

Spring Boot整合RabbitMQ之发布与订阅模式,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

然后进行rabbitMq的整合过程

1.1 引入rabbitmq的jar包

在项目的pom.xml中引入rabbitmq的jar包,详情如下:

<dependency>	
	<groupId>org.springframework.boot</groupId>	
	<artifactId>spring-boot-starter-amqp</artifactId>	
	<version>2.3.12.RELEASE</version>
</dependency>

1.2 配置文件中添加配置

在项目的配置文件中添加rabbitmq的相关配置,配置详情如下:

server:
  port: 10001
 
# rabbitMq 相关配置
spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: guest
    password: guest

guest是rabbitmq的默认密码,不需要重新设置,不过在生产中为了安全是需要改密码的
1.3 创建配置类

配置类用于将队列和交换机进行绑定,该操作也可以使用rabbitmq的管理界面操作,并不是一定需要的步骤。配置类详情如下:

package com.study.rabbitmq.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Configuration
public class RabbitMQConfig {
 
    public static final String EXCHANGE_NAME = "fanout-order-exchange";
    public static final String SMS_QUEUE = "sms-fanout-queue";
    public static final String EMAIL_QUEUE = "email-fanout-queue";
    public static final String WECHAT_QUEUE = "wechat-fanout-queue";
 
    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        /**
         * FanoutExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留, false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除, true:自动删除
         */
        return new FanoutExchange(EXCHANGE_NAME, true, false);
    }
 
    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化, false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }
 
    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }
 
    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }
 
    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
 
    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
 
    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());
    }
}

1.4 模拟发送消息

创建一个service类,在类中进行rabbitMq消息的发送,源码如下:

package com.study.rabbitmq.service;
 
import cn.hutool.json.JSONUtil;
import com.study.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
/**
 * @Author alen
 * @DATE 2022/6/7 23:31
 */
@Service
@Slf4j
public class OrderService {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    public void createOrder(Order order) {
        String body = JSONUtil.toJsonStr(order);
        log.info("订单信息:{}", body);
        //交换机名称
        String exchangeName = "fanout-order-exchange";
        //路由key 由于我们实现的是fanout模式(广播模式),不需要路由key,所有的消费者都可以进行监听和消费
        String routeKey = "";
        //发送mq消息
        rabbitTemplate.convertAndSend(exchangeName, routeKey, body);
        log.info("rabbitmq发送广播模式消息成功。。。");
    }
}

使用单元测试模拟消息发送,单元测试详情如下:

package com.study.rabbitmq;
 
import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
import java.util.UUID;
 
@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {
 
    @Autowired
    private OrderService orderService;
 
    @Test
    void contextLoads() {
        for (long i = 1; i < 50; i++) {
            Order order = new Order();
            order.setRequestId(i);
            order.setUserId(i);
            order.setOrderNo(UUID.randomUUID().toString());
            order.setAmount(10L);
            order.setGoodsNum(1);
            order.setTotalAmount(10L);
            orderService.createOrder(order);
        }
    }
}

发送完后,我们可以在rabbitMq的管理后台看到已经发送成功的消息,效果如下:

Spring Boot整合RabbitMQ之发布与订阅模式,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

可见消息已经全部发送完毕,因为前面的三个队列都是绑定在同一个交换机上,所以三个队列都会收到消息。

2. 创建RabbitMQ的消费者

创建消费者服务S2,项目结构参考生产者项目结构,然后进行消息消费的相关代码的实现,实现过程如下

2.1 引入RabbitMQ的jar包

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>2.3.12.RELEASE</version>
</dependency>

2.2 在项目配置文件中添加配置

配置详情如下

server:
  port: 10002
 
# rabbitmq 相关配置
spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin

2.3 创建MQ消息消费者

消费者类详情如下

package com.study.rabbitmq.service;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
 
/**
 * @Author alen
 * @DATE 2022/6/8 8:15
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //监听队列
public class FanoutEmailConsumer {
 
    @RabbitHandler
    public void emailMessage(String message) {
        log.info("Email fanout --接收到消息:{}", message);
    }
}

启动消费者项目,消费效果如下:

Spring Boot整合RabbitMQ之发布与订阅模式,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

登录rabbitMq后台查看队列的消息情况如下

Spring Boot整合RabbitMQ之发布与订阅模式,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq 到此,似乎感觉整合得很顺利,没啥毛病。但是实际的运用中,以上演示过程中忽略了两个很重要的问题,一是我如何知道消息被顺利的发送到了队列,因为实际的工作中,不大可能每个消息都去rabbitmq管理后台查看。二是如果消息在消费的过程中出现了异常导致消息丢失,不重要的数据还好,如果是支付类的消息呢?就会产生严重的线上问题。那么这两个问题需要怎么处理呢?其实rabbitmq提供了消息发送结果回调和消息消费手动确认来处理这两个问题。文章来源地址https://www.toymoban.com/news/detail-660941.html

到了这里,关于Spring Boot整合RabbitMQ之发布与订阅模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring Boot 3】【Redis】消息发布及订阅

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月21日
    浏览(47)
  • Redis的发布订阅模式:实现消息队列和实时数据推送的利器

    当涉及到实时数据推送和消息队列时,Redis的发布订阅模式是一种非常有用的工具。Redis是一个开源的内存数据库,被广泛用于缓存、队列和实时数据处理等方面。 在本博客中,我们将重点介绍Redis的发布订阅模式,并且提供一些示例代码来帮助读者更好地理解这个模式以及如

    2024年02月12日
    浏览(92)
  • 消息队列——spring和springboot整合rabbitmq

    目录 spring整合rabbitmq——生产者 rabbitmq配置文件信息 倒入生产者工程的相关代码 简单工作模式 spring整合rabbitmq——消费者 spring整合rabbitmq——配置详解 SpringBoot整合RabbitMQ——生产者  SpringBoot整合RabbitMQ——消费者   使用原生amqp来写应该已经没有这样的公司了 创建两个工程

    2024年02月16日
    浏览(51)
  • Spring Boot 整合Redis实现消息队列

      本篇文章主要来讲Spring Boot 整合Redis实现消息队列,实现redis用作消息队列有多种方式,比如: 基于 List 的 rpush+lpop 或 lpush+rpop 基于 List 的 rpush+blpop 或 lpush+brpop (阻塞式获取消息) 基于 Sorted Set 的优先级队列 Redis Stream (Redis5.0版本开始) Pub/Sub 机制   不过这里讲的是

    2024年02月13日
    浏览(44)
  • RabbitMQ 消息队列(Spring boot AMQP)

    几种常见MQ的对比: RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java ScalaJava 协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫

    2024年02月13日
    浏览(43)
  • 第三章 Spring Boot 整合 Kafka消息队列 消息者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年02月22日
    浏览(44)
  • Spring Boot 整合 RabbitMQ 实现延迟消息

    消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。 随着 AMQP 草案的发布,两个月

    2024年04月08日
    浏览(47)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(70)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(43)
  • Spring整合RabbitMQ-配制文件方式-3-消息拉模式

    拉消息的消费者 spring-rabbit.xml 当启动消费者后,便可获取到发送至队列的消息 检查队列的消息的情况: 经过检查确认,发现消息已经被消费了。 至此拉模式的消费者完成。

    2024年02月09日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包