Spring Boot 中的 RabbitMQ 是什么,如何使用
简介
RabbitMQ 是一个开源的消息队列系统,它通过 AMQP(高级消息队列协议)来实现消息的传递。Spring Boot 是目前非常流行的 Java 开发框架,它提供了很多便利性的功能,其中就包括对 RabbitMQ 的支持。
在本文中,我们将介绍 RabbitMQ 的基本概念,以及如何在 Spring Boot 中使用 RabbitMQ。
RabbitMQ 的基本概念
在使用 RabbitMQ 之前,我们需要了解一些基本概念。
消息队列
消息队列是一种用于异步通信的机制。消息发送者将消息发送到队列中,消息接收者从队列中获取消息。通过消息队列,可以实现消息的异步传递,降低系统之间的耦合性。
消息
消息是指需要传递的数据。
生产者
生产者是指向消息队列发送消息的程序。
消费者
消费者是指从消息队列中获取消息并处理的程序。
队列
队列是指消息存储的地方。
交换机
交换机是用于接收生产者发送的消息,并将消息路由到相应的队列中。
路由键
路由键是一个字符串,用于指定消息应该被路由到哪个队列中。
绑定
绑定是指将队列和交换机连接起来的过程。
如何在 Spring Boot 中使用 RabbitMQ
添加依赖
首先,我们需要在 Maven 或 Gradle 中添加 RabbitMQ 的依赖。在 Maven 中,我们可以添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Gradle 中,我们可以添加以下依赖:
implementation 'org.springframework.boot:spring-boot-starter-amqp'
配置 RabbitMQ
在 Spring Boot 中,我们可以使用 application.yml 或者 application.properties 文件来配置 RabbitMQ。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
创建生产者
下面是一个简单的 RabbitMQ 生产者示例:
@Component
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String message) {
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message);
}
}
在这个示例中,我们使用了 Spring AMQP 提供的 RabbitTemplate 类来发送消息。convertAndSend
方法用于将消息发送到指定的交换机和路由键。
创建消费者
下面是一个简单的 RabbitMQ 消费者示例:
@Component
public class RabbitMQConsumer {
@RabbitListener(queues = "myQueue")
public void receive(String message) {
System.out.println("Received message: " + message);
}
}
在这个示例中,我们使用了 Spring AMQP 提供的 @RabbitListener
注解来指定消费者应该监听哪个队列。当有消息到达队列时,receive
方法会被调用,并且接收到消息的内容会作为参数传递给该方法。
运行示例
现在我们已经创建了一个简单的 RabbitMQ 应用程序。我们可以在 main 方法中创建 Spring Boot 应用程序,并在其中注入我们的生产者和消费者。然后,我们可以使用生产者向队列发送消息,消费者将会接收到这些消息并输出到控制台。
@SpringBootApplication
public class Application implements CommandLineRunner {
private final RabbitMQProducer rabbitMQProducer;
private final RabbitMQConsumer rabbitMQConsumer;
public Application(RabbitMQProducer rabbitMQProducer, RabbitMQConsumer rabbitMQConsumer) {
this.rabbitMQProducer = rabbitMQProducer;
this.rabbitMQConsumer = rabbitMQConsumer;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
rabbitMQProducer.send("Hello, RabbitMQ!");
}
}
运行程序后,我们可以在控制台看到类似于以下输出:
Received message: Hello, RabbitMQ!
这表明我们已经成功地向队列发送了一条消息,并且消费者已经成功地接收到了这个消息。
RabbitMQ 高级功能
除了基本功能之外,RabbitMQ 还提供了一些高级功能,例如:
消息确认
当生产者发送一条消息时,它并不知道这条消息是否已经被成功处理。如果消息没有被成功处理,生产者将会不断地尝试重发这条消息,直到它被成功处理为止。
为了解决这个问题,RabbitMQ 提供了消息确认机制。当生产者发送一条消息时,它可以请求 RabbitMQ 确认这条消息是否已经被成功处理。如果消息已经被成功处理,RabbitMQ 将会发送一个确认消息给生产者。
@Component
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
}
@Bean
public ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
if (ack) {
System.out.println("Message with correlation id " + correlationData.getId() + " has been confirmed");
} else {
System.out.println("Message with correlation id " + correlationData.getId() + " has been rejected: " + cause);
}
};
}
}
在这个示例中,我们在生产者中使用了 CorrelationData
类来跟踪消息。我们还创建了一个 ConfirmCallback
bean 来处理消息确认。当消息被成功处理时,confirmCallback
方法将会被调用,并输出一条确认消息。当消息被拒绝时,confirmCallback
方法也将会被调用,并输出一条拒绝消息。
消息持久化
默认情况下,RabbitMQ 不会将消息持久化到磁盘上。如果 RabbitMQ 在崩溃之前没有将消息发送给消费者,这些消息将会丢失。
为了解决这个问题,我们可以将消息标记为持久化。这样,即使 RabbitMQ 崩溃,消息也会被保存到磁盘上,并在 RabbitMQ 重启后重新发送给消费者。
@Component
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String message) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message messageObject = new Message(message.getBytes(), messageProperties);
rabbitTemplate.send("myExchange", "myRoutingKey", messageObject);
}
}
在这个示例中,我们在生产者中使用了 MessageProperties
类来设置消息的持久化属性。我们还使用了 MessageDeliveryMode.PERSISTENT
枚举值来标记消息为持久化消息。
消息 TTL
消息 TTL(Time To Live)是指消息在队列中存储的时间。如果消息在指定的时间内没有被消费者消费,它将会被从队列中自动删除。
@Component
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String message) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("5000"); // 5 seconds
Message messageObject = new Message(message.getBytes(), messageProperties);
rabbitTemplate.send("myExchange", "myRoutingKey", messageObject);
}
}
在这个示例中,我们在生产者中使用了 MessageProperties
类来设置消息的 TTL 属性。我们将 messageProperties.setExpiration("5000")
设置为 5000 毫秒,这意味着消息在队列中最多存储 5 秒钟。
死信队列
死信队列是指当消息被拒绝或者过期时,它将会被重新路由到另一个队列中。这个队列就被称为死信队列。
@Configuration
public class RabbitMQConfig {
@Bean
public Queue myQueue() {
return QueueBuilder.durable("myQueue")
.withArgument("x-dead-letter-exchange", "myDeadLetterExchange")
.withArgument("x-dead-letter-routing-key", "myDeadLetterRoutingKey")
.build();
}
@Bean
public Queue myDeadLetterQueue() {
return QueueBuilder.durable("myDeadLetterQueue").build();
}
@Bean
public Exchange myExchange() {
return ExchangeBuilder.directExchange("myExchange").durable(true).build();
}
@Bean
public Exchange myDeadLetterExchange() {
return ExchangeBuilder.directExchange("myDeadLetterExchange").durable(true).build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey").noargs();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(myDeadLetterQueue()).to(myDeadLetterExchange()).with("myDeadLetterRoutingKey").noargs();
}
}
在这个示例中,我们创建了一个名为 myQueue
的队列,并使用 withArgument
方法来指定它的死信交换机和路由键。我们还创建了一个名为 myDeadLetterQueue
的队列,并将其绑定到名为 myDeadLetterExchange
的交换机上。最后,我们创建了绑定,将 myQueue
队列绑定到 myExchange
交换机上。
当消息在 myQueue
中被拒绝或过期时,它将会被重新路由到 myDeadLetterExchange
交换机,并将其路由到 myDeadLetterQueue
队列中。文章来源:https://www.toymoban.com/news/detail-519771.html
总结
本文介绍了 RabbitMQ 的基本概念,以及如何在 Spring Boot 中使用 RabbitMQ。我们还介绍了 RabbitMQ 的一些高级功能,包括消息确认、消息持久化、消息 TTL 和死信队列。通过学习本文,你应该已经有了足够的知识来开始在 Spring Boot 中使用 RabbitMQ 了。文章来源地址https://www.toymoban.com/news/detail-519771.html
到了这里,关于Spring Boot 中的 RabbitMQ 是什么,如何使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!