Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发

这篇具有很好参考价值的文章主要介绍了Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费;

1 为什么要使用Rabbitmq:

RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点:

  • 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和吞吐量。

  • 解耦合:RabbitMQ 的消息队列机制可以将发送者和接收者解耦合,减少了应用程序之间的耦合度。

  • 可靠性高:RabbitMQ 支持事务和持久化,能够确保消息不会丢失。

  • 高吞吐量:RabbitMQ 支持多种吞吐量调优方法,能够处理高并发的消息通讯。

  • 可扩展性:RabbitMQ 支持集群和分布式部署,可以扩展到大规模的消息通讯场景。

RabbitMQ 提供了易用、高效、灵活、可靠的消息传递机制,可以帮助开发者更快地构建系统并实现各种复杂的业务场景。

2 springboot 整合:

2.1 pom 引入依赖:

 <!-- rabbitmq 自动装配 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 提供web访问 默认端口8080 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- loomback 用于生成get set 方法 -->
<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
</dependency>
<!-- 阿里的json 数据转换 -->
<dependency>
   <groupId>com.alibaba.fastjson2</groupId>
   <artifactId>fastjson2</artifactId>
   <version>2.0.31</version>
</dependency>

2.2 连接参数配置:
2.2.1 基础配置:
基础配置后springboot 的自动装载机制会注册一个RabbitTemplate rabbitTemplate 对象用于消息的接收和发送;

############# 基础配置
# mq 服务器的地址
spring.rabbitmq.host=localhost
# mq 服务器的端口
spring.rabbitmq.port=5672
# mq 服务器的连接使用的用户名
spring.rabbitmq.username=admin
# mq 服务器的连接使用的密码
spring.rabbitmq.password=rabbitmq
# mq 服务器的连接使用的虚拟机
spring.rabbitmq.virtual-host=my_vhost

注意: 其中 spring.rabbitmq.virtual-host 为隔离的虚拟机,需要根据自己业务进行配置,如果rabbitmq 有web 端可以在web端创建需要的v_host:
Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea
2.2.2 可扩展的连接参数配置:

############# 连接和管道配置
# When the cache mode is 'CHANNEL', the connection cache size cannot be configured.
# spring.rabbitmq.cache.connection.mode 为connection 生效 ,connection 连接池的大小
#spring.rabbitmq.cache.connection.size=3
# 与broker 连接的 模式 channel 或者 connection 默认channel 
spring.rabbitmq.cache.connection.mode=channel
# 与broker 连接的默认时间,默认为 60000即 60 秒,超时会会中断并抛出异常,单位毫秒
spring.rabbitmq.connection-timeout=1000
# 每个连接中可以建立的channel 数量,默认值25
spring.rabbitmq.cache.channel.size=50
# 如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel
# 默认值为 -1,表示不限制等待时间,即一直等待直到获取到可用的 Channel,单位毫秒
spring.rabbitmq.cache.channel.checkout-timeout=2000
# 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.requested-heartbeat=60
# 客户端总共可以创建总的channel 数量
spring.rabbitmq.requested-channel-max=1024

默认与rabbitmq 的连接为channel,多个channel 公用一个connection , 每个线程都从缓存池中获取channel ,每个线程中持有的channel 是互相隔离的;

2.3 生产者发送消息:
生产者发送消息主要是通过 引入 RabbitTemplate 模版对象来完成;这里按照发送消息发送的场景分别进行介绍:

2.3.1 交换机和队列的绑定:
因为消息最开始是要发送到交换机上的,然后在通过交换机通过routkey 路由键到匹配的队列中;所以我们需要先在项目中使用的
virtual-host 中去分别创建交换机和队列,然后进行绑定;一帮情况下,我们应该向运维去申请自己的虚拟机,交换机,队列,然后通过后,项目中直接使用即可;当然通过代码也完全可以进行交换机和队列的创建和绑定,这里我们通过web 页面来进行处理:

2.3.1.1交换机的创建:
Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea

  • Virtual host : 对应隔离的虚拟机,所以需要选择项目中 通过spring.rabbitmq.virtual-host 参数连接的虚拟机;

  • Name: 虚拟机的名称,见名知意即可;

  • Type: 虚拟机的类型:比较常用的有直连 direct; 主题topic,广播fanout;
    Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea
    这里对交换机的类型进行简单的介绍:

  • 直连direct的交换机,交换机直接与队列完成绑定,通过发送消息是携带的Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中:

  • Direct Exchange
    Direct Exchange 是最简单的交换机类型,交换机直接与队列完成绑定,它根据消息携带的 Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中。 Direct Exchange 可以理解为一张路由表,交换机通过 Routing Key 在路由表中查找匹配队列,将消息从生产者处发送到匹配队列。

  • Topic Exchange
    Topic Exchange 根据 Routing Key 的匹配规则将消息路由到对应的队列中。Topic Exchange 支持两种匹配规则:* 代表通配符,表示可以匹配一个单词,# 代表通配符,表示可以匹配多个单词。例如,Routing Key 为 com.XXX.# 的消息会被路由到匹配 com.XXX. 开头的所有队列中,Routing Key 为 # ,会匹配到所有的消息;列如 user.* 匹配 user. 后跟一个单词的消息,可以匹配到user.a 但是匹配不到user.a.b 。

  • Fanout Exchange
    Fanout Exchange 会将消息路由到所有绑定到它上面的队列中。Fanout Exchange 的路由方式与路由表无关,会忽略 Routing Key,与 Direct Exchange 和 Topic Exchange 相比,它具有更高的传输效率和更低的消耗。

  • Headers Exchange
    Headers Exchange 根据消息头中的键值对匹配规则将消息路由到对应的队列中。Headers Exchange 的匹配规则相对较复杂,需要在绑定时指定键值对的匹配方式。

  • Durability : 交换机是否持久化到磁盘的属性值设置

  • 如果将 Durability 属性设置为 durable ,表示交换器会被持久化到磁盘上,即使 RabbitMQ 服务器在交换机定义被创建之后终止,交换机定义仍然能够在服务器重新启动时得到恢复,从而保证交换机在重启后仍然存在。

  • 如果将 Durability 属性设置为 transient ,表示交换器不会被持久化到磁盘上,如果 RabbitMQ 服务器重启,则该交换器定义将会丢失。

  • Auto delete 用于指定该交换机是否自动删除。当一个交换机关联的所有队列都被删除时,如果交换机的 Auto Delete 属性为 true,则该交换机也会被自动删除

  • Internal 是否为内部交换机:
    内部交换机的 internal 属性设置为 true,使其只能被通过 AMQP 协议连接到相同 Virtual Host 的客户端使用,不能被直连类型的 Exchange 或 Headers 类型的 Exchange 所使用。
    内部交换机只能用于消费者和生产者在同一个 RabbitMQ 实例中的场景,而不能用于服务器和客户端之间传递消息。
    内部交换机主要用于应用程序之间传递消息,而不是用于服务器和客户端之间传递消息。

  • Arguments:交换机的额外属性,比较常用的属性如alternate-exchange:指定备用交换机。如果一条消息无法被路由到任何队列中,那么它将被发送到备用交换机中;

一般我们创建交换机时只需要选择Virtual host:,填入交换机的名称,选择交互机的类型这3项,其它都默认即可:
Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea

2.3.1.2 队列的创建:
Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea- type 队列的类型:
在 RabbitMQ 中,队列的 type 参数共有三种,分别是 classic、quorum 和 stream。它们的区别可以简单概括如下:
classic 队列:
最早的、经典的队列类型,支持多个消费者竞争消费消息,但是在节点宕机时可能会出现消息丢失的情况。适用于简单的消息处理场景。

quorum 队列:
支持高可用性、多个消费者竞争消费的队列类型。它通过复制机制保证消息的可靠性,可以在节点宕机时自动进行故障转移,避免消息丢失。适用于需要高可用特性的分布式环境中使用,但相对来说,quorum 队列性能较 classic 队列有所下降。

stream 队列:
支持无限缓存的消息流队列,可以通过队列中的缓存来处理各种等待中的问题。传统队列中当消息进入队列时,它就被立即写入了内存中,并等待处理。这样做的问题是,当生产者不断地发送消息时,很容易将内存撑满。 stream 队列则允许队列的缓存区域随着时间和队列大小的增长而扩展,使得待处理的消息可以在缓存区域中有所体现。适用于需要处理海量时间序列数据的场景。

需要注意的是,stream 队列是从 RabbitMQ 3.8 开始引入的新类型,目前还不是很成熟,可能在稳定性和性能方面还需要更多的优化和改进。因此,在选择队列类型时,需要结合具体的业务情况和系统限制,选择采用 classic、quorum 还是 stream 队列,以达到最优的性能和可用性。

  • Name 队列的名称;
  • Durability 队列是否持久化,参数意义同交换机;
  • Auto delete:
    在 RabbitMQ 中,队列的 auto-delete 参数用于控制队列的自动删除行为。如果将 auto-delete 参数设置为 true,则在最后一个消费者断开连接时,队列会自动被删除。
  • Arguments 队列参数的额外选择;

通常创建队列时只需要选择Virtual host,填入队列的名称,其它项默认即可:
Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea

2.3.1.3 交换机和队列的绑定:完成交换机和队列关系的绑定
Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发,# spring-boot,java工具篇,java-rabbitmq,rabbitmq,intellij-idea
2.3.2 发送消息:
2.3.2.2 生产者参数的配置:

########## 生产者配置
spring.rabbitmq.template.exchange=my_exchange
# 启用消息投递结果确认
spring.rabbitmq.publisher-returns=true
# 启用强制消息投递,即生产者发送消息成功或者失败,需要返回确认消息
spring.rabbitmq.template.mandatory=true
# 消息发布者确认模式
spring.rabbitmq.publisher-confirm-type=correlated

# 发送重试是否可用
spring.rabbitmq.template.retry.enabled= true
# 最大重试次数,默认值为 3
spring.rabbitmq.template.retry.max-attempts=3
# 第一次和第二次尝试发布或传递消息之间的间隔,默认值为 1000 毫秒
spring.rabbitmq.template.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.template.retry.multiplier=1 
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.template.retry.max-interval= 1000

2.3.2.3 使用RabbitTemplate 模版发送单条消息,发送多条消息,发送延迟消息,使用自定义的RabbitTemplate 发送事务消息:
1) 定义一个类来封装我们要发送的消息结构:

package com.example.rabbitmqdemo.rabbitmq.msgDto;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class MsgDto implements Serializable {
    // 消息类型
    private String msgType;
    // 消息体
    private Object body;
}

2) 对RabbitTemplate 模版对象配置消息确认:
如果消息投递失败,我们需要对此类消息进行记录,方便后续进行数据补偿;

package com.example.rabbitmqdemo.rabbitmq.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component("rabbitMqCustomerConfig")
public class BatchConfig {
    @Value("${env:prod}")
    private String env;
    @Autowired
    SimpleRabbitListenerContainerFactory containerFactory;
    @Autowired
    RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void simpleListenerBatchInit() {
        log.info("设置批量-----");
        containerFactory.setBatchListener(true);
        if ("prod".equals(env)) {
            // 依照不同的环境进行开启
            containerFactory.setAutoStartup(true);
        }


        // 设置 ConfirmCallback 回调函数 确认消息是否成功发送到 Exchang
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                if (null == correlationData) {
                    // 延迟消息 correlationData 为null
                    return;
                }
                log.debug("Message sent successfully:{} ", correlationData.getId());

            } else {
                if (null == correlationData && null == cause) {
                    // 延迟消息 correlationData 为null
                    return;
                }
                log.error("Message sent failed: {}", correlationData.getId() + ", cause: " + cause);
            }
        });
        // ReturnCallback  处理的是未路由的消息返回的情况
        rabbitTemplate.setReturnCallback((oneMessage, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            if (routingKey.indexOf("delay") != -1) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            log.debug("Message returned: {}", new String(oneMessage.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
        });

    }


}

3) 因为发送事务需要关闭消息的确认,所以这里重新定义一个RabbitTemplate 模版用来发送事务消息:

package com.example.rabbitmqdemo.rabbitmq.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TxRabbitTemplate {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.port}")
    private String port;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean(value = "txRabbitTemplat")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    private ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setChannelCacheSize(10);
        // 关闭消息的ack 确认
        connectionFactory.setPublisherConfirms(false);
        connectionFactory.setPublisherReturns(false);

        return connectionFactory;
    }
}

4)使用自动装配的RabbitTemplate 模版来进行 消息发送 :

package com.example.rabbitmqdemo.rabbitmq.producer;

import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;

@Slf4j
@Component
public class MessageProducer {
    // 这里可以指定一个默认发送使用的交换机
    @Value("${amqp-binding.exchange-name:my_exchange}")
    private String exchangeName;


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    @Qualifier("txRabbitTemplat")
    private RabbitTemplate txRabbitTemplate;


    /**
     * 指定的routKey 发送信息
     *
     * @param message
     */
    public void sendMessage(String routKey, Object message) {
        this.sendMessage(exchangeName, routKey, JSONObject.toJSONString(message));
    }

    /**
     * 通过交换机,路由key 发送消息
     *
     * @param exchangeName
     * @param routKey
     * @param message
     */
    public void sendMessage(String exchangeName, String routKey, Object message) {
        // 设置消息的唯一标识符
        long deliveryTag = System.currentTimeMillis();
        rabbitTemplate.convertAndSend(exchangeName, routKey, message, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + deliveryTag));
            return messagePostProcessor;
        }, new CorrelationData(UUID.randomUUID().toString()));

    }


    /**
     * 指定的routKey 发送批量信息
     *
     * @param messages
     */
    public void sendMessageBatch(String routKey, MsgDto messages) {
        this.sendMessageBatch(exchangeName, routKey, JSONObject.toJSONString(messages));
    }

    /**
     * 通过交换机,路由key 发送批量信息
     *
     * @param exchangeName
     * @param routKey
     * @param messages
     */
    public void sendMessageBatch(String exchangeName, String routKey, Object messages) {
        rabbitTemplate.convertSendAndReceive(exchangeName, routKey, messages, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + 1));
            return messagePostProcessor;
        }, new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     * 指定的routKey 发送信息
     *
     * @param message
     */
    public void sendDelayMessage(String routKey, Object message, long delayTime) {
        this.sendDelayMessage(exchangeName, routKey, message, delayTime);
    }

    /**
     * 指定的routKey 发送延迟信息
     *
     * @param message
     */
    public void sendDelayMessage(String exchangeName, String routKey, Object message, long delayTime) {
        log.debug("producer send delay message:{}", message);
        rabbitTemplate.convertAndSend(exchangeName, routKey, message, header -> {
            header.getMessageProperties().setHeader("x-delay", delayTime);
            return header;
        });
    }

    /**
     * 指定的routKey 发送事务信息
     *
     * @param message
     */
    @SneakyThrows
    public void sendTxMessage(String exchangeName, String routKey, Object message) {
        log.debug("producer send delay message:{}", message);
        String messageStr = JSONObject.toJSONString(message);
        // method 1:
//        sendTransactedMsgByNewChannel(exchangeName,routKey,message);
        // method2:
        sendTransactedMsgByNTemplate(exchangeName, routKey, messageStr);


    }

    private void sendTransactedMsgByNTemplate(String exchangeName, String routKey, String message) {
        txRabbitTemplate.execute(channel -> {
            try {
                String messageId = UUID.randomUUID().toString() + "_messageId";
                String correlationId = UUID.randomUUID().toString() + "_correId";

                // 创建 BasicProperties 对象并设置属性
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .messageId(messageId)
                        .correlationId(correlationId)
                        .build();
                channel.txSelect(); // 开启事务

                channel.basicPublish(exchangeName, routKey, properties, message.getBytes(Charset.forName("UTF-8"))); // 发送消息
//                "124".substring(7);
                channel.txCommit(); // 提交事务
            } catch (Exception e) {
                channel.txRollback(); // 回滚事务
            }
            return true;
        });
    }

    @SneakyThrows
    private void sendTransactedMsgByNewChannel(String exchangeName, String routKey, String message) {
        // 获取新的channel 对象
        Channel channel = txRabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
        // 开启事务
        channel.txSelect();
        try {
            // 消息格式化
            channel.basicPublish(exchangeName, routKey, null, message.getBytes(Charset.forName("UTF-8")));
            // 消息提交
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            throw e;
        }
    }


}

5)测试代码:

package com.example.rabbitmqdemo.rabbitmq.controller;

import com.example.rabbitmqdemo.rabbitmq.enums.RabbitRoutKeyEnum;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.example.rabbitmqdemo.rabbitmq.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

@RestController
public class TestSendMsgController {
    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("/sendMsg")
    public boolean sendMsg(@RequestParam String content,@RequestParam String routKey) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            msgs.add(content+"_"+i);

        }
        msgs.stream().forEach(e->{
            MsgDto msgDto = new MsgDto("user",e);
            messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);
        });

        return true;
    }
    @GetMapping("/sendBatchMsg")
    public boolean sendBatchMsg(@RequestParam String content,@RequestParam String routKey) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            msgs.add(content+"_"+i);
        }
        MsgDto msgDto = new MsgDto("test",msgs);
        messageProducer.sendMessageBatch(RabbitRoutKeyEnum.业务_多条消息.getRoutKey(), msgDto);

        return true;
    }

    @GetMapping("/sendDelayMsg")
    public boolean sendDelayMsg(@RequestParam String content,@RequestParam long delayTime) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            msgs.add(content+"_"+i);
        }
        msgs.stream().forEach(e->{
            messageProducer.sendDelayMessage("my_delay_exchange",RabbitRoutKeyEnum.业务_延迟.getRoutKey(),e,delayTime);
        });

        return true;
    }

    @GetMapping("/sendTxMsg")
    public boolean sendTxMsg(@RequestParam String content) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 2; i++) {
            msgs.add(content+"_"+i);
        }
        msgs.stream().forEach(e->{
            MsgDto msgDto = new MsgDto("tx",e);
            messageProducer.sendTxMessage("my_tx_exchange",RabbitRoutKeyEnum.业务_事务.getRoutKey(),msgDto);
//            messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);
        });


        return true;
    }
}


这里分别测试了单条消息,多条消息,延迟消息,事务消息的发送,将其封装为MsgDto对象,在发送时将其转为json 字符串;基本上满足了大部分的业务场景;需要注意的是rabbitmq 中所谓批量发送的消息实际上会被消息压缩为1条消息进行发送,到达队列是也是1条消息;

6 )routKey 的枚举类:

package com.example.rabbitmqdemo.rabbitmq.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum RabbitRoutKeyEnum {

    业务_单条消息("my_routKey"),
    业务_多条消息("my_batch_routKey"),
    业务_1("my_one_routKey"),
    业务_延迟("my_delay_routKey"),

    业务_事务("my_tx_routKey"),

    ;

    private String routKey;


}


至此我们已基本完成生产端消息的发送以及发送结果的监听处理;需要注意的是对于延迟消息,返回的确认消息correlationData 是一个null 值,所以这里对其消息的确认进行了一次特殊的判断;

3 消费者接收消息:

3.1 消费者参数的配置:

########## 消费者配置
# 是否自动启动消息的监听
spring.rabbitmq.listener.simple.auto-startup=false
# 消费消息确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 批量预取条数 默认值250
spring.rabbitmq.listener.simple.prefetch=50
# 开启批量消费
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消费的条数
spring.rabbitmq.listener.simple.batch-size=2
# 并发消费最小线程数
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费最大线程数
spring.rabbitmq.listener.simple.max-concurrency=1


### 消费失败 重试参数
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重试次数,默认值为 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重试的时间间隔,默认值为 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息监听器是否启用无状态(stateless)重试 默认true
spring.rabbitmq.listener.simple.retry.stateless=false
# 控制当消息消费失败后,RabbitMQ 是否需要将消息重新入队。该参数的默认值为 true,即消息将被重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true

以上参数,配置了消费端消费消息后的ack 机制为手动提交,并且设定了 批量预取条数 和每次批量消费的条数,以及消费失败的重试机制配置;

3.2 消费消息:
消费者监听某个或者几个队列,然后通过channel 获取要消费的消息:

package com.example.rabbitmqdemo.rabbitmq.consumer;

import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;

@Slf4j
@Component
public class MessageConsumer {

    /**
     * 逐条/批量 消费
     *
     * @param messages
     */
//    @RabbitListener(queues = "my_queue_one")
    public void receiveMessage(List<Message> messages, Channel channel) throws IOException {
        log.debug("逐条消费消息:{}", messages);
        for (Message message : messages) {
            try {
//                // 处理消息
                log.debug("Received message: {}", message);
                String jsonMessage = new String(message.getBody(), "UTF-8");
                MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
                // 数据处理

                // 手动发送 ack 消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception ex) {
                // 发生异常,手动发送 nack 消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            }
        }

    }


    /**
     * 逐条消费--延时消息
     *
     * @param messages
     */
    @RabbitListener(queues = "my_deay_queue")
    public void receiveDelayMessage(List<Message> messages, Channel channel) throws IOException {
        for (Message message : messages) {
            try {
                // 处理消息
                log.debug("Received delay message: {}", message);
                String jsonMessage = new String(message.getBody(), "UTF-8");
                MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);

                // 手动发送 ack 消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception ex) {
                // 发生异常,手动发送 nack 消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            }
        }

    }

    /**
     * 逐条消费--事务消息
     *
     * @param messages
     */
    @RabbitListener(queues = "my_tx_queue")
    public void receiveTxMessage(List<Message> messages, Channel channel) throws IOException {
        for (Message message : messages) {
            try {
                // 处理消息
                log.debug("Received delay message: {}", message);
                String jsonMessage = new String(message.getBody(), "UTF-8");
                MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);

                // 手动发送 ack 消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception ex) {
                // 发生异常,手动发送 nack 消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            }
        }

    }

}

这里我们接收到消息后,然后通过"UTF-8"编码(生产者默认按照UTF-8 对数据编码后进行发送)将字节数据转换为字符串,然后通过阿里的json jar 完成java 对象的转换,进行业务处理,最后手动提交消息;

4 总结:

  • Rabbitmq 对于消息的发送依赖于交换机,通过routKey 绑定不同的queue 完成消息的路由工作;
  • Rabbitmq 发送消息可以为其配置ack确认机制,以及发送失败重试机制参数可以配合完成消息的发送;
  • Rabbitmq 发送消息可以进行批量发送,但是本质上会被合并到一条消息进行发送;
  • Rabbitmq 对于消息的消费,依赖于构建channel 管道 ,绑定queue 完成消息的消费;
  • Rabbitmq 消费消息,可以进行手动的ack 确认,并且可以设置消费重试参数,应便于消费失败的场景;

5 扩展:

5.1 rabbitmq 发送事务消息为什么要关闭 消息的确认回调?

在RabbitMQ中,如果发送事务消息,并且开启了确认模式,那么需要特别注意关闭消息的确认回调,以避免一些潜在的问题。
在RabbitMQ中,开启事务模式后,生产者发送消息时,RabbitMQ会将消息缓存在生产者端。在事务提交之前,不会直接将消息发送到队列。如果在事务未提交的情况下,RabbitMQ服务器异常中断或者连接被关闭,那么消息将会丢失。为了避免这种情况的发生,可以采用事务提交确认和确认模式,在确认之后才将消息发送到队列中。

然而,在发送事务消息时,开启确认模式后,需要关闭消息的确认回调。这是因为在事务提交之前,消息并没有发送到队列中,确认回调将在消息发送到队列后才触发。而在事务模式下,消息已经被缓存到生产者端,没有被发送到队列中,所以确认回调不应该被触发。文章来源地址https://www.toymoban.com/news/detail-724887.html

到了这里,关于Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法

    原文网址:SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法_IT利刃出鞘的博客-CSDN博客         本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列。 交换机 下边两种方式等价。 队列 下边两种方式等价 绑定 下边两种方式等价 注意:第一种的参数并不是字符

    2023年04月09日
    浏览(57)
  • IDEA创建SpringBoot+maven项目

    1.新建file-new-project项目 2. 选择springboot项目,修改项目名,选择java8,type默认为maven,点击next 3.选择相关依赖,web中选择spring web,SQL中选择mysql driver,developer tools中选择lombok,点击finish 4.新建项目如图 5.设置maven的配置,file-settings 6.选择build,execution,deployment-build tools -maven 7.选择

    2024年02月15日
    浏览(74)
  • SpringBoot项目整合Redis,Rabbitmq发送、消费、存储邮件

    本文主要是【Rabbitmq】——SpringBoot项目整合Redis,Rabbitmq发送、消费、存储邮件的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 🌄每日一句:狠狠沉淀,顶峰相见 测试结果:此时指定邮箱已收到验

    2024年02月04日
    浏览(44)
  • idea导入springboot项目没有maven

            是因为项目识别pom文件失败了,需要我们手动添加maven的主pom文件。         在项目中双击shift按钮,进入文件查找的功能,然后搜索maven。 然后点击Add Maven Project添加maven主配置文件,然后等待编译就行。     参考链接: Idea导入SpringBoot项目,没有maven - 简书 (jian

    2024年02月15日
    浏览(61)
  • IDEA 搭建 Maven模块化项目

    目录 1.前言 2. 软硬件环境 3.项目搭建 3.1.创建 SpringBoot 父项目 3.2. 构建子项目centerdao 3.3. 构建子项目centerweb 4. 建立父子 Module 依赖 4.1 删除不必要文件 4.2.修改 Parent 项目 packaging 4.3.修改子项目pom.xml 信息 4.4.补充说明 5. 项目继承关系 6. 验证项目 7.异常处理  8.结语 鸣谢    

    2024年02月04日
    浏览(52)
  • 【电商项目实战】基于SpringBoot完成首页搭建

    🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的专栏《电商项目实战》。🎯🎯 👉点击这里,就可以查看我的主页啦!👇👇 Java方文山的个人主页 🎁如果感觉还不错的话请给我点赞吧!🎁🎁 💖期待你的加入,

    2024年02月04日
    浏览(45)
  • SpringBoot实战项目整合RabbitMQ+ElaticSearch实现SKU上下架功能

    😊 @ 作者: Eric 💖 @ 主页: https://blog.csdn.net/weixin_47316183?type=blog 🎉 @ 主题: SpringBoot实战项目整合RabbitMQ+ElaticSearch实现SKU上下架功能 ⏱️ @ 创作时间: 2023年07月03日 最终实现效果:针对SKU的上下架 上架效果: 1、后台选择 SKU ,点击上架,该 SKU 修改为 上架 状态 2、同时向

    2024年02月11日
    浏览(40)
  • idea导入SpringBoot项目,没有启动按钮,没有maven

    解决办法:(快捷键双击Shift,在搜索框中搜索maven,点击Add Maven Project,就 行了) 如果在idea出现下图这种,说明成功了

    2024年02月11日
    浏览(65)
  • 解决IDEA新建springboot项目时不能导入maven依赖(右边没有maven窗口)

    1.问题:今天在github上找了一个springboot项目,但是用idea打开后发现不能导入maven依赖,并且在IDEA右边也没有出现maven窗口,如下图  2.解决方法:右键点击pom.xml文件,然后点击“Add as Maven Project”即可  此时发现右边出现maven,然后在pom.xml中重新加载maven依赖即可  

    2024年02月14日
    浏览(69)
  • idea新建Springboot项目,设置默认maven和jdk版本

    问题: 由于每次新建Springboot项目,maven总是在c盘和jdk版本不是自己需要的版本。每次都需要自己重新配置。 解决: 为了解决这个问题,设置新建项目时指定默认配置。 一、设置新建项目时,默认指定的Maven版本 1.file–》Other Settings–》settinf for… 2.找到Maven配置。配置上即

    2024年02月16日
    浏览(68)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包