rabbitmq+springboot实现幂等性操作

这篇具有很好参考价值的文章主要介绍了rabbitmq+springboot实现幂等性操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

文章目录

  • 1.场景描述
    • 1.1 场景1
    • 1.2 场景2
  • 2.原理
  • 3.实战开发
    • 3.1 建表
    • 3.2 集成mybatis-plus
    • 3.3 集成RabbitMq
      • 3.3.1 安装mq
      • 3.3.2 springBoot集成mq
    • 3.4 具体实现
      • 3.4.1 mq配置类
      • 3.4.2 生产者
      • 3.4.3 消费者

1.场景描述

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

1.1 场景1

什么意思呢?举个例子:一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
这种情景就会出现消息可能被多次地投递。

1.2 场景2

还有一种场景是程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

以上两个场景对于消息队列来说就是同一个messageId的消息重复投递下来了。

我们利用消息id来判断消息是否已经消费过,如果该信息被消费过,那么消息表中已经 会有一条数据,由于消费时会先执行插入操作,此时会因为主键冲突无法重复插入,我们就利用这个原理来进行幂等的控制,消息内容可以用json格式来进行传输的。

3.实战开发

3.1 建表

DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (
  `message_id` varchar(50) NOT NULL COMMENT '消息ID',
  `message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
  `status` int DEFAULT '0' COMMENT '消费状态(0-未消费成功;1-消费成功)',
  `retry_times` int DEFAULT '0' COMMENT '重试次数',
  `type` int DEFAULT '0' COMMENT '消费类型',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2 集成mybatis-plus

《springBoot集成mybatisPlus》

3.3 集成RabbitMq

3.3.1 安装mq

推荐使用docker安装rabbitmq,还未安装的可以参考以下信息:

  • docker安装

3.3.2 springBoot集成mq

  • 1.添加依赖
 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.4 生产者具体实现

3.4.1 mq配置类

  • DirectRabbitConfig
    具体如何开启可以参考《rabbitMq实现死信队列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    //正常交换机的名字
    public final static String  EXCHANGE\_NAME = "exchange\_name";
    //正常队列的名字
    public final static String QUEUE\_NAME="queue\_name";
    //死信交换机的名字
    public final static String  EXCHANGE\_DEAD = "exchange\_dead";
    //死信队列的名字
    public final static String QUEUE\_DEAD="queue\_dead";
    //死信路由key
    public final static String DEAD\_KEY="dead.key";




    //创建正常交换机
    @Bean(EXCHANGE\_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
                //持久化 mq重启后数据还在
                .durable(true)
                .build();
    }



    //创建正常队列
    @Bean(QUEUE\_NAME)
    public Queue queue(){
        //正常队列和死信进行绑定 转发到 死信队列,配置参数
        Map<String,Object>map=getMap();
        return new Queue(QUEUE\_NAME,true,false,false,map);
    }

    //正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射
    @Bean
    public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
                           @Qualifier(EXCHANGE\_NAME) Exchange exchange){
        return BindingBuilder.bind(queue)
                .to(exchange)
                //路由规则
                .with("app.#")
                .noargs();
    }

    //创建死信队列
    @Bean(QUEUE\_DEAD)
    public Queue queueDead(){
        return new Queue(QUEUE\_DEAD,true,false,false);
    }

    //创建死信交换机
    @Bean(EXCHANGE\_DEAD)
    public Exchange exchangeDead(){
        return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
                .durable(true) //持久化 mq重启后数据还在
                .build();
    }


    //绑定死信队列和死信交换机
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(queueDead())
                .to(exchangeDead())
                //路由规则 正常路由key
                .with(DEAD\_KEY)
                .noargs();
    }

    /\*\*
      获取死信的配置信息
     \*
     \*\*/
    public Map<String,Object>getMap(){
        //3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。
        //方式一
        Map<String,Object> map=new HashMap<>(16);
        //死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
        map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
        //死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
        map.put("x-dead-letter-routing-key", DEAD\_KEY);
        //方式二
        //消息的过期时间,单位:毫秒;达到时间 放入死信队列
        // map.put("x-message-ttl",5000);
        //方式三
        //队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据
        // map.put("x-max-length",3);
        return map;
    }


}
  • 延迟队列配置
    具体如何开启可以参考《rabbitMq实现死信队列》

由于rabbitMq中不直接支持死信队列,需要我们利用插件rabbitmq_delayed_messgae_exchage进行开启

/**
 * 定义延迟交换机
 */
@Configuration
public class RabbitMQDelayedConfig {
    //队列
    private static final String DELAYQUEUE = "delayedqueue";
    //交换机
    private static final String DELAYEXCHANGE = "delayedExchange";
    @Bean
    public Queue delayqueue(){return new Queue(DELAYQUEUE);}
    //自定义延迟交换机
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        /**
         * 1、交换机名称
         * 2、交换机类型
         * 3、是否需要持久化
         * 4、是否需要自动删除
         * 5、其他参数
         */
        return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);
    }
    //绑定队列和延迟交换机
    @Bean
    public Binding delaybinding(){
        return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();
    }
}

3.4.2 生产者

  • 1.消费队列的生产者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class Sender_Direct {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 用于消费订单
     *
     * @param orderId
     */
    public void send2Direct(String orderId) {
        //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
        MessageProperties messageProperties = new MessageProperties();
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "内容设置",  message -> {
            //设置消息的id为唯一
            messageProperties.setMessageId(UUID.randomUUID().toString());
            messageProperties.setContentType("text/plain");
            messageProperties.setContentEncoding("utf-8");
            message.getMessageProperties().setMessageId(orderId);
            return message;
        });
    }

}

3.4.3 消费者

1.开启手动ack配置

spring:
  application:
    name: shop
  rabbitmq:
    host: 192.168.1.102
    port: 5673
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
        acknowledge-mode: manual

消费者要配置ack重试机制,具体参考前几篇文章,使用的是mysql消息ID的唯一性,有时候可能生成一样的订单,具体的没有进行实验,内容是json生成的,可以执行业务

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class Receiver_Direct {
    private static final Integer delayTimes = 30;//延时消费时间,单位:秒

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = {"smsQueue"})
    public void receiveD(Message message, Channel channel) throws IOException {
        try {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody());//获取消息
            //向数据库插入数据
            MessageIdempotent messageIdempotent = new MessageIdempotent();
            messageIdempotent.setMessageId(messageId);
            messageIdempotent.setMessageContent(msg);
            messageIdempotent.setRetryTimes(0);
            System.out.println(messageIdempotent.toString());
            Boolean save = true;   //设置保存成功,消息投递失败是在确认模式那里

            if (!save) {//说明属于重重复请求
                //1、处理消息内容的业务,解析json数据
                //2、创建订单,并保存
                Boolean flag = consumeOrder(new Shop());
                if (flag){
                    //投入延迟队列,如果30分钟订单还没有消费,就删除订单
                    rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{
                        //设置发送消息的延长时间 单位:ms,表示30分钟
                        message1.getMessageProperties().setDelay(1000*60*30);
                        return message1;
                    });
                    //更新消息状态,消费成功,
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }else {
                    //延迟投入死信,进行重试
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                }
            } else {
                //1、处理消息内容的业务,解析json数据
                //2、创建订单,并保存
                //投入死信队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }
        }catch (Exception e){
            System.out.println("错误信息");
        }

    }

    private boolean consumeOrder(Shop shop) {
        return true;
    }

    @RabbitListener(queues = {" delay.queue.demo.delay.queue"})
    public void dead(String payload, Message message, Channel channel) throws IOException {
        System.out.println("死信队列:"+payload);
        //删除消息 将数据库状态更新为失败,更新邮件或者消息通知,有时候可以人工消费
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }

    @RabbitListener(queues = "delayedqueue")
    public void receivemsg(Message messages){
        //查询有没有被消费,也就是更新成功,有时候需要乐观锁
    }
}

至此mq的消息重复以及幂等的信息处理就很完美的解决了,当然本文以数据库为例进行实现,感兴趣的可以尝试使用redis来进行实现文章来源地址https://www.toymoban.com/news/detail-694486.html

到了这里,关于rabbitmq+springboot实现幂等性操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(53)
  • SpringBoot Redis 注解 拦截器来实现接口幂等性校验

    幂等性, 通俗的说就是一个接口, 多次发起同一个请求, 必须保证操作只能执行一次 比如:订单接口, 不能多次创建订单 支付接口, 重复支付同一笔订单只能扣一次钱 支付宝回调接口, 可能会多次回调, 必须处理重复回调 普通表单提交接口, 因为网络超时等原因多次点击提

    2024年01月19日
    浏览(61)
  • SpringBoot RabbitMQ 实现消息队列功能

    作者:禅与计算机程序设计艺术 在企业级应用中,为了提升系统性能、降低响应延迟、改善用户体验、增加系统的稳定性、提高资源利用率等方面所需的功能之一就是使用消息队列。RabbitMQ是一个开源的AMQP(Advanced Message Queuing Protocol)的实现消息队列,它是用Erlang语言开发的。

    2024年02月09日
    浏览(47)
  • [AIGC] 用幂等性解决重复消息问题

    在构建分布式系统时,开发人员经常会遇到重复消息问题。这可能是由于网络延迟、系统故障或其他原因导致的。无论如何,重复消息会导致系统出现错误和不一致状态。为了解决这个问题,我们可以使用幂等性来确保系统的可靠性和一致性。 在数学中,幂等性是指一个函数

    2024年02月19日
    浏览(33)
  • 【RabbitMQ教程】第八章 —— RabbitMQ - 幂等性、优先级、惰性

                                                                       💧 【 R a b b i t M Q 教程】第八章—— R a b b i t M Q − 幂等性、优先级、惰性 color{#FF1493}{【RabbitMQ教程】第八章 —— RabbitMQ - 幂等性、优先级、惰性} 【 R abbi tMQ 教程】第八章

    2024年02月09日
    浏览(39)
  • SpringBoot中接口幂等性实现方案-自定义注解+Redis+拦截器实现防止订单重复提交

    SpringBoot+Redis+自定义注解实现接口防刷(限制不同接口单位时间内最大请求次数): SpringBoot+Redis+自定义注解实现接口防刷(限制不同接口单位时间内最大请求次数)_redis防刷_霸道流氓气质的博客-CSDN博客 以下接口幂等性的实现方式与上面博客类似,可参考。 什么是幂等性? 幂等

    2024年02月15日
    浏览(54)
  • SpringBoot自定义注解+AOP+redis实现防接口幂等性重复提交,从概念到实战

    本文为千锋教育技术团独家创作,更多技术类知识干货,点个关注持续追更~ 接口幂等性是Web开发中非常重要的一个概念,它可以保证多次调用同一个接口不会对结果产生影响。如果你想了解更多关于接口幂等性的知识,那么本文就是一个不错的起点。 在Web开发中,我们经常

    2024年02月03日
    浏览(55)
  • Spark操作Hive表幂等性探索

    旁边的实习生一边敲着键盘一边很不开心的说:做数据开发真麻烦,数据bug排查太繁琐了,我今天数据跑的有问题,等我处理完问题重新跑了代码,发现报表的数据很多重复,准备全部删了重新跑。 我:你的数据操作具备幂等性吗? 实习生:啥是幂等性?数仓中的表还要考

    2024年02月13日
    浏览(39)
  • 如何保证用户重试操作的幂等性

    服务不稳定是一类常态,面对此类场景恰当的应对策略应该是什么?退一步说,即使我们能够确保第一方服务的稳定性,我们又应该如何面对网络延迟以及掌控以外的不确定性?这都是本篇文章会谈到的内容 本文是团队内部分享的文字版,敏感信息已经抹去或者重写。我们通

    2024年02月06日
    浏览(45)
  • Springboot 定时任务,分布式下幂等性如何解决

    在分布式环境下,定时任务的幂等性问题需要考虑多个节点之间的数据一致性和事务处理。 一种解决方法是使用分布式锁来保证同一时间只有一个节点能够执行该任务。具体实现可以使用Redis或Zookeeper等分布式协调工具提供的分布式锁功能。 另一种解决方法是使用消息队列来

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包