SpringBoot —— 整合RabbitMQ常见问题及解决方案

这篇具有很好参考价值的文章主要介绍了SpringBoot —— 整合RabbitMQ常见问题及解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

企业中最常用的消息中间件既不是RocketMQ,也不是Kafka,而是RabbitMQ。

RocketMQ很强大,但主要是阿里推广自己的云产品而开源出来的一款消息队列,其实中小企业用RocketMQ的没有想象中那么多。

至于Kafka,主要还是用在大数据和日志采集方面,除了一些公司有特定的需求会使用外,对消息收发准确率要求较高的公司依然是以RabbitMQ作为企业级消息队列的首选


一、使用步骤

1.引入依赖

<!--AMQP依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
</dependency>

2.环境配置

这里需要创建2个springboot项目,一个 provider (生产者),一个consumer(消费者)

生产者application.yml
SpringBoot —— 整合RabbitMQ常见问题及解决方案
消费者application.yml
SpringBoot —— 整合RabbitMQ常见问题及解决方案

3.生产者处理消息队列

创建消息队列

package com.local.springboot.springbootcommon.config.amqp;

import com.local.springboot.springbootcommon.constant.RabbitMQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;


@Configuration
public class RabbitMQConfig {

    /**
     * 创建队列说明
     * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
     * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
     * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
     * 一般设置一下队列的持久化就好,其余两个就是默认false
     * @return
     */
    @Bean
    public Queue goodsEventQueue() {
        return new Queue(RabbitMQConstant.QUEUE_GOODS_EVENT, true, false, false, null);
    }

    /**
     * 创建交换机
     */
    @Bean
    public DirectExchange goodsEventExchange() {
        return new DirectExchange(RabbitMQConstant.EXCHANGE_GOODS_EXCHANGE, true, false);
    }

    /**
     * 将队列绑定到交换机上
     */
    @Bean
    public Binding goodsQueueToGoodsExchange() {
        return BindingBuilder
                .bind(goodsEventQueue())
                .to(goodsEventExchange())
                .with(RabbitMQConstant.ROUTING_KEY_GOODS_EVENT);
    }
}

启动生产者服务,浏览器打开http://127.0.0.1:15672/,可以看见消息队列创建
SpringBoot —— 整合RabbitMQ常见问题及解决方案
发送消息
在业务需要的地方,发生消息至消息队列

@Resource
private RabbitTemplate rabbitTemplate;

@Override
public ApiResponse saveItem(ItemEntity itemEntity) {
    if (itemEntity != null) {
        String id = itemEntity.getSkuId();
        if (StringUtils.isNotBlank(id)) {
            ItemEntity entity = getById(id);
            if (entity != null) {
                BeanUtils.copyProperties(itemEntity, entity);
                updateById(entity);
            }
        } else {
            EntityUtil.initEntity(itemEntity);
            itemEntity.setSkuId(IdWorker.get32UUID());
            save(itemEntity);
        }
    }
    // 同步商品信息
    rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_GOODS_EXCHANGE, RabbitMQConstant.ROUTING_KEY_GOODS_EVENT, itemEntity);
    return ApiResponse.ok();
}

4.消费者监听队列

package com.local.springboot.springbootservice.listener;

import com.local.springboot.springbootdao.entity.ItemEntity;
import com.local.springboot.springbootservice.service.search.ElasticSearchService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;

@Component
@Slf4j
public class GoodsEventQueueListener {

    @Resource
    private ElasticSearchService elasticSearchService;

    @RabbitListener(queues = "goodsEventQueue")
    public void onGoodsEvent(ItemEntity itemEntity, Channel channel
            , @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        // 同步商品至es
        try {
            log.info("同步商品事件队列接收参数:{}", itemEntity);
            // 业务处理
            elasticSearchService.addGoods(itemEntity);
        } catch (Exception e) {
            log.error("同步商品事件异常:{}", e.getMessage());
            e.printStackTrace();
        } finally {
            // 手动签收消息
            channel.basicAck(tag, false);
        }
    }
}

5.运行结果

上述业务是在添加商品时,向消息队列发送消息,消费者接收消息之后对商品进行相应的处理,实现业务上的解耦。

同时运行两个服务,生产者调用添加商品接口
SpringBoot —— 整合RabbitMQ常见问题及解决方案
查看日志,消费者接收到消息之后做相应处理
SpringBoot —— 整合RabbitMQ常见问题及解决方案
至此,SpringBoot 简单整合RabbitMQ成功结束。

二、问题及解决

1.消息丢失

消息丢失可能的原因

①消息发出后,中途网络故障,服务器没收到
②消息发出后,服务器收到了,还没持久化,服务器宕机
③消息发出后,服务器收到了,服务挂了,消息自动签收,消费方还未处理业务逻辑。

在说解决方案之前,我们需要明白两个概念:消息确认机制消息签收机制

1.消息确认机制

主要是生产者使用的机制,用来确认消息是否被成功消费。

添加配置如下:

publisher-returns: true #确认消息已发送到队列(Queue)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
@Component
@Slf4j
public class RabbitMQProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    

    /**
     * 发送消息
     *
     * @param exchange
     * @param routingKey
     * @param source
     */
    public void sendMessage(String exchange, String routingKey, ItemEntity source) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.convertAndSend(exchange, routingKey, source);
    }

    /**
     * 成功接收后的回调
     *
     * @param correlationData
     * @param b
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        // 成功后的处理
    }

    /**
     * 失败后的回调
     *
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 失败后的处理
    }
}

实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,可以对失败或者成功之后进行相应处理,之后进一步做消息补偿。

但是这种方法并不推荐,因为这种机制十分降低MQ的性能,一般采用后台管理实现人工补偿,两种方法只是性能与运维成本之间的一种抉择

2.消息签收机制

一般RabbitMQ的消息是自动签收的,你可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。

在开发中,我们一般都采用手动签收的方式,这样可以有效避免消息的丢失。
SpringBoot —— 整合RabbitMQ常见问题及解决方案

3.解决方案

上述两个概念搞清楚之后,再回过头来看消息丢失的原因

①和②是由于生产方未开启消息确认机制导致
③是由于消费方未开启手动签收机制导致。

解决方案

①生产方发送消息时,要try…catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中,日志表中要有消息发送状态,若发送失败,由定时任务定期扫描重发并更新状态;
②生产方publisher必须要加入确认回调机制,确认成功发送并签收的消息,如果进入失败回调方法,就修改数据库消息的状态,等待定时任务重发;
③消费方要开启手动签收ACK机制,消费成功才将消息移除,失败或因异常情况而尚未处理,就重新入队。

2.消息积压

1.出现原因

消息积压出现的场景一般有两种:

①消费方的服务挂掉,导致一直无法消费消息;
②消费方的服务节点太少,导致消费能力不足,从而出现积压,这种情况极可能就是生产方的流量过大导致。

2.解决方案

①既然消费能力不足,那就扩展更多消费节点,提升消费能力;
②建立专门的队列消费服务,将消息批量取出并持久化,之后再慢慢消费。

①就是最直接的方式,也是消息积压最常用的解决方案,但有些企业考虑到服务器成本压力,会选择第②种方案进行迂回,先通过一个独立服务把要消费的消息存起来,比如存到数据库,之后再慢慢处理这些消息即可。

2.消息重复

1.出现原因

消息重复大体上有两种情况会出现:

①消息消费成功,事务已提交,签收时结果服务器宕机或网络原因导致签收失败,消息状态会由unack转变为ready,重新发送给其他消费方;
②消息消费失败,由于retry重试机制,重新入队又将消息发送出去。

2.解决方案

网上大体上能搜罗到的方法有三种:

①消费方业务接口做好幂等;
②消息日志表保存MQ发送时的唯一消息ID,消费方可以根据这个唯一ID进行判断避免消息重复;
③消费方的Message对象有个getRedelivered()方法返回Boolean,为TRUE就表示重复发送过来的。

这里只推荐第一种,业务方法幂等这是最直接有效的方式,②还要和数据库产生交互,③有可能导致第一次消费失败但第二次消费成功的情况被砍掉。
ps:
幂等性:就是一条命令执行任意多次所产生的影响和执行一次的影响相同

(这里分布式锁应该能解决这个问题)

最简单的方案就是,在数据库中建一个消息日志表,这个表记录消息ID和消息执行状态。这个我们消费消息的逻辑变为:在消息日志中增加一个消息记录,再根据消息记录,执行业务。我们每次都会在插入之前检查该消息是否已存在。这样就不会出现一条消息被多次执行的情况。这里的数据库也可以使用redis/memcache来实现唯一约束方案。

2.小结

消息丢失、消息重复、消息积压三个问题中,实际上主要解决的还是消息丢失,而消息丢失的最常见企业级方案之一就是定时任务补偿。

其实MQ只是一个做为辅助的中间件,使用MQ的目的就是解耦和转发,不用做多余的事情,保证MQ本身是流畅的、职责单一的即可

总结

本文主要简单讲述了SpringBoot整合RabbitMQ的过程,以及RabbitMQ的三种常见问题及解决方案

其实RabbitMQ本身的性能还是很强大的,总结以下三点:

①消息100%投递会增加运维成本,中小企业视情况使用,非必要不使用;
②消息确认机制影响性能,非必要不使用;
③消费者先保证消息能签收,业务处理失败可以人工补偿。

此外消息中间件的问题其实还有很多,比如

  • 序列化、传输协议,以及内存管理等问题?
  • 为什么消息队列能实现高吞吐?
  • 消息中间件中的队列模型与发布订阅模型的区别?
  • 如何选型消息中间件?

参考文章 https://baijiahao.baidu.com/s?id=1737713844357727373&wfr=spider&for=pc

« 上一章:SpringBoot —— 简单多模块构建

创作不易,关注💖、点赞👍、收藏🎉就是对作者最大的鼓励👏,欢迎在下方评论留言🧐文章来源地址https://www.toymoban.com/news/detail-411139.html

到了这里,关于SpringBoot —— 整合RabbitMQ常见问题及解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • JDK9~17+Springboot3 @Resource常见问题和解决方案

    因为JDK版本升级的改动,在Jdk9~17环境下,搭建Springboot项目,会出现原有@Resource(javax.annotation.Resource)不存在的问题,导致项目从Jdk8迁移到高版本时遇到的问题 原因 你可能会问,为什么javax.annotation.Resource注解不存在呢?  从Jdk9开始,JavaEE从Jdk中分离,jdk就移除掉了javax.a

    2024年02月04日
    浏览(40)
  • RabbitMQ常见问题以及实际问题解决

    ** ** 消息可靠性问题: 消息从生产者发送到Exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性? 发送时丢失: - 生产者发送的消息为送达exchange - 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 ①生产者消息确认 RabbitMQ提供

    2024年02月16日
    浏览(31)
  • Rabbitmq 常见问题处理

    Rabbitmq queue NaN status code 如下图: 参考文章 原因分析: Queue在mear数据库中存,但在队列列表中并不存在,所以才会存在该问题,并且是在RabbitMQ做了镜像集群的时候才会出现这样的情况。 解决 删除队列再重建。或者重启镜像机器服务。

    2024年02月09日
    浏览(26)
  • RabbitMQ常见的应用问题

    在实际生产环境中,可能会由于网络问题导致消息接收异常产生某种影响,基于这种情况我们需要保障消息的可靠性。 RabbitMQ中的消息可靠性也称为消息补偿,如下图所示,可以保证消息的可靠性。 分为9种种步骤实现消息补偿 1、生产者处理业务逻辑,将数据写入到数据库。

    2024年02月11日
    浏览(33)
  • RabbitMQ常见问题之延迟消息

    当一个队列中的消息满足下列情况之一时,可以成为死信( dead letter ): 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false 消息是一个过期消息,超时无人消费 要投递的队列消息堆积满了,最早的消息可能成为死信 如果该队列配置了 dead

    2024年01月18日
    浏览(47)
  • RabbitMQ常见问题之消息堆积

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最 早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。 解决消息堆积有三种种思路: 增加 更多消费者 ,提高消费速度 在消费者内开启 线程

    2024年01月18日
    浏览(29)
  • 正则常见问题及解决方案

    使用正则处理问题的基本思路。有一些方法比较固定,比如将问题分解成多个小问题,每个小问题见招拆招:某个位置上可能有多个字符的话,就⽤字符组。某个位置上有多个字符串的话,就⽤多选结构。出现的次数不确定的话,就⽤量词。对出现的位置有要求的话,就⽤锚

    2024年02月10日
    浏览(47)
  • RabbitMQ常见问题之高可用

    RabbitMQ 的是基于 Erlang 语言编写,而 Erlang 又是一个面向并发的语言,天然支持集群模式。 RabbitMQ 的集群有两 种模式: 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备

    2024年01月18日
    浏览(30)
  • RocketMQ常见问题及解决方案

    RocketMQ FAQ 可能原因 1)消费端处理消息发生异常没有捕获或是因为其他原因,没有返回消费状态 解决方案: 消费端捕获异常, 如果需要重试,返回ConsumeConcurrentlyStatus. RECONSUME_LATER , 如果不需要重试,返回ConsumeConcurrentlyStatus. RECONSUME_SUCCESS 可以在消费端增加重试次数判断,

    2023年04月08日
    浏览(34)
  • RabbitMQ常见问题之消息可靠性

    MQ 的消息可靠性,将从以下四个方面展开并实践: 生产者消息确认 消息持久化 消费者消息确认 消费失败重试机制 对于 publisher ,如果 message 到达 exchange 与否, rabbitmq 提供 publiser-comfirm 机制,如果 message 达到 exchange 但是是否到达 queue , rabbitmq 提供 publisher-return 机制。这两

    2024年01月18日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包