Spring Boot 整合 RabbitMQ 实现延迟消息

这篇具有很好参考价值的文章主要介绍了Spring Boot 整合 RabbitMQ 实现延迟消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

关于 RabbitMQ

消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。

随着 AMQP 草案的发布,两个月后,RabbitMQ 1.0 就发布了。

RabbitMQ 的架构模型可以分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

整体的流程非常简单,生产者将消息发送到服务端,消费者从服务端获取对应的消息。

生产者在发送消息前需要先确认发送给哪个虚拟主机的哪个交换器,再由交换器通过路由键将消息转发给与之绑定的队列。

最后,消费者到指定的队列中获取自己的消息进行消费。

客户端

生产者和消费者都属于客户端。

生产者:消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消息包括消息体和标签。

消费者:消息的接收方,负责消费消息体。

服务端

虚拟主机、交换机、队列都属于服务端。

虚拟主机:用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。有点类似 Java 中的包,同一个包下,不能有相同名称的类或者接口。

交换器:负责接收生产者发来的消息,并根据规则分配给对应的队列,不生产消息,只是消息的搬运工。

队列:负责存储消息,生产者发送的消息会放在这里,消费者从这里取。

连接和信道

连接和信道是两个不同的概念,连接的英文叫 connection,信道叫 channel。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

连接里包含了多条信道,连接用的是 TCP 连接,因为 AMQP 就是用 TCP 实现的。

为什么不直接使用连接,而要在连接的基础上新建信道呢?

因为 TCP 连接是比较昂贵的,新建需要三次握手,销毁需要四次挥手,所以如果每个线程在想 RabbitMQ 服务端发送/接收消息的时候都新建一个 TCP 连接,就会非常的消耗资源,于是就有了信道。

信道是线程私有的,连接是线程共享的。

信道+连接的模式,既保证了线程之间的私密性,又减少了系统开销。

业务场景

消息队列的主要功能有三种:

  • 异步处理,比如说在做电商业务的时候,提交订单的动作可能涉及到创建订单、扣除库存、增加用户积分、发送订单邮件等。它们并不是一个串行的操作,可以把发送订单邮件和增加用户积分交给消息队列去做。
  • 系统解耦,消息队列可以作为不同系统之间的桥梁,且不受系统技术栈的约束。
  • 缓冲削峰,消息队列可以将大量的请求放到队列中,然后再按照一定的顺序规则交给业务服务器处理。

工作模式

RabbitMQ 支持 7 种工作模式:

  • 简单模式
  • 工作队列模式
  • 广播模式
  • 路由模式
  • 动态路由模式
  • 远程模式
  • 生产者确认模式

我们这里只演示前三种,

简单模式

简单模式真的超级简单,生产者将消息发送给队列,消费者从队列中获取消息队列即可。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

生活中就类似于 快递员将包裹放到快递柜,然后给取件人发一个取件码,取件人通过取件码去快递柜里取包裹📦。

工作队列模式

工作队列模式在本质上只比简单模式对了一个队列,消费者从一个变成了多个。生产者将消息放入到队列中,多个消费者会一次进行消费。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

比如说有 3 个消费者,生产者向队列发送 3 条消息,3 个消费者会没人消费一条消息,有点雨露均沾的意味。

当然了,也可以通过配置,将其改成能者多劳的模式。

广播模式

与工作队列模式不同,广播模式就有交换器参与了。在广播模式下,即使生产者只生产了一条消息,它对应的所有消费者都能全部接收,真正做到了公平公正公开。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

安装配置 RabbitMQ

RabbitMQ 的安装方式可以参考官方:

Installing RabbitMQ | RabbitMQ

  • 服务器数据统计——消息投递情况,以及连接、信道、交换器、队列、消费者的数量
  • RabbitMQ 节点信息——erlang 进程、内存、磁盘空间等
  • 端口和 Web 信息
  • 。。。

启动RabbitMQ服务。

rabbitmq-server.bat

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

我们点击 admin 面板 点击虚拟主机新建一个 codingmore 的虚拟主机。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

并新建一个用户 admin:

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

并设置它的权限。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

整合 RabbitMQ

第一步,在 pom.xml 文件中添加 RabbitMQ 的 starter 依赖。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步,在 application.yml 文件中添加 RabbitMQ 的配置。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: codingmore

简单模式

第三步,新建 RabbitMQController 控制器类,添加 sendSimple 生产者接口。

@RestController
@Api(tags = "整合 RabbitMQ")
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendSimple")
    @ApiOperation("简单模式")
    public ResultObject sendSimple(String routingKey, String message) {
        rabbitTemplate.convertAndSend(routingKey, message);
        return ResultObject.success("ok");
    }
}

RabbitTemplate 是 Spring Boot 为我们封装好的操作 RabbitMQ 的工具类。

第四步,新建 SimpleConsumer 类,添加简单模式的消费者。

@Slf4j
@Component
@RabbitListener(queuesToDeclare = @Queue("simple"))
public class SimpleConsumer {
    @RabbitHandler
    public void receive(String message) {
        log.info("简单模式:{}", message);
    }
}

启动服务,在浏览器地址栏访问 http://localhost:8080/doc.html 打开 Swagger。

输入参数,点击发送。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

在Intellij IDEA 中可以看到输出信息。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

这就表示我们完成了 RabbitMQ 的简单模式。

工作队列模式

在 RabbitMQController 控制器中添加 sendWork 工作队列接口:

@PostMapping("/sendWork")
@ApiOperation("工作队列模式")
public ResultObject sendWork(String routingKey, String message) {
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend(routingKey, "第" + i + "消息:" + message);
    }
    return ResultObject.success("ok");
}

新建 WorkConsumer 类,添加工作队列模式的消费者。

@Slf4j
@Component
public class WorkConsumer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveOne(String message) {
        log.info("工作队列模式 receiveOne:{}", message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveTwo(String message) {
        log.info("工作队列模式 receiveTwo:{}", message);
    }
}

build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html 刷新 Swagger。

输入参数,点击发送。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

在Intellij IDEA 中可以看到输出信息。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

这就表示我们完成了 RabbitMQ 的工作队列模式。

广播模式

在 RabbitMQController 控制器中添加 sendBroadcast 广播接口:

@PostMapping("/sendBroadcast")
@ApiOperation("广播模式")
public ResultObject sendBroadcast(String exchange, String message) {
    rabbitTemplate.convertAndSend(exchange, "",message);
    return ResultObject.success("ok");
}

新建 BroadcastConsumer 类,添加广播模式的消费者。

@Slf4j
@Component
public class BroadcastConsumer {
    @RabbitListener(bindings = @QueueBinding(value = @Queue,
            exchange = @Exchange(name = "fanout", type = "fanout")))
    public void receiveOne(String message) {
        log.info("广播模式 receiveOne:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue,
            exchange = @Exchange(name = "fanout", type = "fanout")))
    public void receiveTwo(String message) {
        log.info("广播模式 receiveTwo:{}", message);
    }
}

注意这里的 Exchange(交换器)名字要是 fanout,它是 RabbitMQ 默认的一种交换器。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

Fanout模式不需要处理路由键(所以我们在 sendBroadcast 接口中,convertAndSend 方法中传递的 routingKey 是空的),我们只需要简单的将队列绑定到exchange上,发送到exchange的每一个消息都会被转发到与该exchange绑定的所有队列上。

Fanout类型的Exchange转发消息是最快的。除此之外,还有 Direct Exchange、Topic Exchange,大家可以去了解一下。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html 刷新 Swagger。

输入参数,点击发送。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

在Intellij IDEA 中可以看到输出信息。

springboot rabbitmq 延迟消息,项目,相关技术积累,java-rabbitmq,spring boot,rabbitmq

可以看到两个消费者都消费了消息,这就表示我们完成了 RabbitMQ 的广播模式。文章来源地址https://www.toymoban.com/news/detail-844722.html

到了这里,关于Spring Boot 整合 RabbitMQ 实现延迟消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 搭建RabbitMQ消息服务,整合SpringBoot实现收发消息

    作者主页 :Designer 小郑 作者简介 :3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN博客专家,蓝桥云课认证讲师。 消息队列是一种在应用程序之间传递数据的通信机制 ,它基于 发布-订阅 模式,将消息发送者(发布者)和消息接收者

    2024年02月09日
    浏览(53)
  • [Spring boot] Spring boot 整合RabbitMQ实现通过RabbitMQ进行项目的连接

     🍳作者:天海奈奈 💭眼过千遍不如手锤一遍:推荐一款模拟面试,斩获大厂 o f f e r ,程序员的必备刷题平台 − − 牛客网  👉🏻点击开始刷题之旅 目录 什么是RabbitMQ   消息队列:接受并转发消息,类似于快递公司 消息队列的优点 消息队列的特性 RabbitMQ特点 RabbitMQ核

    2024年01月24日
    浏览(81)
  • springboot整合rabbitmq 实现消息发送和消费

    Spring Boot提供了RabbitMQ的自动化配置,使得整合RabbitMQ变得非常容易。 首先,需要在pom.xml文件中引入amqp-client和spring-boot-starter-amqp依赖: 接下来需要在application.properties文件中配置RabbitMQ连接信息: 然后编写消息发送者: 其中,my-exchange和my-routing-key是需要自己定义的交换机和

    2024年02月07日
    浏览(40)
  • Spring Boot 整合Redis实现消息队列

      本篇文章主要来讲Spring Boot 整合Redis实现消息队列,实现redis用作消息队列有多种方式,比如: 基于 List 的 rpush+lpop 或 lpush+rpop 基于 List 的 rpush+blpop 或 lpush+brpop (阻塞式获取消息) 基于 Sorted Set 的优先级队列 Redis Stream (Redis5.0版本开始) Pub/Sub 机制   不过这里讲的是

    2024年02月13日
    浏览(44)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(48)
  • springboot 整合redis 延迟消息功能

    1、redis配置项一定要顶格,修改redis如下图的配置:yes 改为 no 2、 3、springboot中配置redis 4、监听器 5、container 6.代码中发布消息

    2024年02月09日
    浏览(37)
  • 使用 Spring Boot 整合 Kafka:实现高效的消息传递

    Kafka 是一种流处理平台,用于在分布式系统中处理高吞吐量的数据流。它是一种基于发布订阅模式的消息系统,能够处理来自多个应用程序的数据流。Kafka 具有高度的可扩展性、可靠性和性能,使得它成为处理大数据的流行选择。 Spring Boot 是一种开源框架,用于简化 Java 应用

    2024年02月14日
    浏览(46)
  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(53)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包