springboot 整合rabbitMq保证消息一致性方案

这篇具有很好参考价值的文章主要介绍了springboot 整合rabbitMq保证消息一致性方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

rabbitMq介绍

RabbitMQ是一种开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,可用于在应用程序之间传递消息。RabbitMQ最初由LShift开发,现在由Pivotal Software维护。
RabbitMQ可以在多个平台上运行,包括Windows、Mac OS X和各种Linux发行版。它提供了多种编程语言的客户端库,如Java、Python、Ruby、.NET等等。RabbitMQ的主要特点包括:

  1. 可靠性:RabbitMQ使用多种机制确保消息传递的可靠性,例如消息确认、持久化和备份机制等。
  2. 灵活性:RabbitMQ支持多种消息传递模式,如点对点、发布/订阅、RPC等,可根据具体应用场景选择适合的模式。
  3. 可扩展性:RabbitMQ可以通过集群方式实现水平扩展,从而提高系统的吞吐量和可用性。
  4. 可插拔性:RabbitMQ支持多种插件,如消息传递追踪、消息转换、限流等,可以根据需要选择使用。
  5. 易用性:RabbitMQ提供了简单易用的管理界面,可用于监控和管理消息队列。同时,它还提供了丰富的文档和社区支持,方便用户学习和使用。

rabbitMq工作原理

springboot 整合rabbitMq保证消息一致性方案
RabbitMQ的工作原理主要包括生产者(Producer)、消息队列(Queue)和消费者(Consumer)三个部分。

  1. 生产者将消息发送到消息队列:生产者将消息发送到RabbitMQ的消息队列中,消息队列会暂时存储这些消息。
  2. 消费者从消息队列中获取消息:消费者可以从消息队列中获取消息,并对这些消息进行处理。
  3. 消费者处理完消息后发送确认:消费者在处理完消息后,向RabbitMQ发送确认信息,表示已经处理完该消息。
  4. RabbitMQ将已确认的消息从队列中移除:当RabbitMQ接收到消费者的确认信息后,会将已经确认的消息从队列中移除,这样其他消费者就不会再次获取到该消息。
    RabbitMQ使用交换机(Exchange)将消息发送到相应的消息队列中。交换机根据特定的路由规则(Routing Key)将消息发送到一个或多个消息队列中

工作模式

  1. 简单模式(Simple Mode):也称为点对点(Point-to-Point)模式,消息只能被一个消费者消费。生产者将消息发送到队列中,消费者从队列中获取消息并处理,消息处理完后从队列中删除。这种模式下的队列只能有一个消费者。
  2. 发布/订阅模式(Publish/Subscribe Mode):也称为广播模式(Broadcasting),消息可以被多个消费者消费。生产者将消息发送到交换机中,交换机将消息复制到多个队列中,每个队列对应一个消费者。消费者从队列中获取消息并处理。
  3. 路由模式(Routing Mode):消息可以根据路由规则(Routing Key)被不同的消费者消费。生产者将消息发送到交换机中,交换机根据消息的路由规则将消息发送到一个或多个队列中,每个队列对应一个消费者。消费者从队列中获取消息并处理。
  4. 主题模式(Topic Mode):也称为通配符模式(Wildcards),消息可以根据模糊匹配的路由规则被不同的消费者消费。生产者将消息发送到交换机中,交换机根据消息的路由规则和通配符规则将消息发送到一个或多个队列中,每个队列对应一个消费者。消费者从队列中获取消息并处理。

rabbitMq安装

安装步骤如下

  1. 下载 Erlang:RabbitMQ 是基于 Erlang 语言开发的,因此需要先安装 Erlang。Erlang 的下载地址为 https://www.erlang.org/downloads,根据系统类型和版本下载对应的安装程序,然后按照提示进行安装。
  2. 下载 RabbitMQ:RabbitMQ 的下载地址为 https://www.rabbitmq.com/download.html,根据系统类型和版本下载对应的安装程序,然后按照提示进行安装。
  3. 启动 RabbitMQ:在安装完成后,可以通过命令行或者图形化界面启动 RabbitMQ。在 Windows 系统中,可以在开始菜单中找到 RabbitMQ Server,然后选择启动 RabbitMQ Server。在 Linux 或者 Mac 系统中,可以在命令行中输入 rabbitmq-server start 命令来启动 RabbitMQ。
  4. 配置 RabbitMQ:启动 RabbitMQ 后,默认会监听 5672 端口,如果需要更改监听端口、配置虚拟主机、添加用户等操作,可以通过 RabbitMQ 的管理控制台或者命令行来进行配置。
  5. 使用 RabbitMQ:安装和配置完成后,就可以开始使用 RabbitMQ 进行消息传递了。可以选择使用 RabbitMQ 的客户端库或者 AMQP 协议来进行消息传递。

需要注意的是,安装 RabbitMQ 之前需要先安装 Erlang,而且版本要匹配。另外,如果在安装过程中出现问题,可以参考 RabbitMQ 的官方文档或者社区论坛来解决。
安装成功后 访问127.0.0.1:15672 出现登录页面安装成功。

springboot整合RabbitMQ

首先通过idea准备springboot的项目,添加rabbitMQ的依赖

<!-- SpringBoot web启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- SpringBoot amqp启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- SpringBoot 测试启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--  数据库连接-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--  mybatis 连接-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

配置rabbitMq如下

spring:
  rabbitmq:   #配置文件
    host: #####  #ip
    port: 5672
    username: ###
    password: ####
    virtual-host: /
    connection-timeout: 15000
    publisher-confirm-type: correlated  #开启 confirms 回调  P →  Exchange
    publisher-returns: true  # 开启 returnedMessage 回调 Exchange →  Queue
    template:
      mandatory: true   # 抵达队列异步发送有效回调
    listener:
      simple:
        acknowledge-mode: manual  # 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto
        concurrency: 5   #当前线线程数
        max-concurrency: 10  # 最大线程数
        prefetch: 10
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 10000ms   # 重试最大间隔时间10s
          initial-interval: 2000ms  # 重试初始间隔时间2s
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间,重试时间依次是2s,4s,8s,10s
   
       #消息message
com.acrdpm.smart_topic_queue=smart.queue
com.acrdpm.smart_topic_exchange=smart.exchange
com.acrdpm.smart_topic_routingKey=smart.routing.key

#延迟队列
com.acrdpm.delayed_queue=delayed.queue
com.acrdpm.delayed_exchange=delayed.exchange
com.acrdpm.delayed_routingKey=delayed.routing.key

配置rabbitMq配置文件

@Configuration
@Slf4j
//启用rabbitmQ
@EnableRabbit
@Getter
public class RabbitConfig {

	
    private final RabbitTemplate rabbitTemplate;
	// 将配置文件封装成工具类
    private final RabbitPropertiesConfig rabbitPropertiesConfig;
    // 消息备份类
    private final MsgLogService msgLogService;



    public RabbitConfig(RabbitTemplate rabbitTemplate, RabbitPropertiesConfig rabbitPropertiesConfig, MsgLogService msgLogService) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitPropertiesConfig = rabbitPropertiesConfig;
        this.msgLogService = msgLogService;
    }




    /**
     * 定义硬件需要的topic
     * @return
     */
    @Bean
    public Queue smartQueue() {
        return new Queue(rabbitPropertiesConfig.getSmart_topic_queue(), true);
    }

    @Bean
    public TopicExchange smartExchange() {
        return new TopicExchange(rabbitPropertiesConfig.getSmart_topic_exchange(), true, false);
    }

    @Bean
    public Binding smartBinding() {
        return BindingBuilder.bind( smartQueue()).to(smartExchange()).with(rabbitPropertiesConfig.getSmart_topic_routingKey());
    }


    /**
     * 定义延迟队列
     */
    @Bean
    public Queue delayedQueue(){
        return new Queue(rabbitPropertiesConfig.getDelayed_queue());
    }

    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(rabbitPropertiesConfig.getDelayed_exchange(), "x-delayed-message", true, false,
                args);
    }

    @Bean
    public Binding bindingDelayedQueue(){
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(rabbitPropertiesConfig.getDelayed_routingKey()).noargs();
    }




    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     * 1、spring.rabbitmq.publisher-confirms: true
     * 2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     * 1、spring.rabbitmq.publisher-returns: true
     * spring.rabbitmq.template.mandatory: true
     * 2、设置确认回调ReturnCallback
     * <p>
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     * MyRabbitConfig对象创建完成以后,执行这个方法
     */
    @PostConstruct
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirm...correlationData[,{}",correlationData);
            log.info("ack是:{}",ack);
            log.info("case是:{}",cause);
            System.out.println("confirm...correlationData[" + correlationData + "]==>ack:[" + ack + "]==>cause:[" + cause + "]");
            if (ack) {
                log.info("消息成功发送到Exchange");
                String msgId = correlationData.getId();
                msgLogService.updateStatus(msgId, MsgLogStatus.DELIVER_SUCCESS);
            } else {
                log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
            }
        });

        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         * 修改数据库状态
         */
        rabbitTemplate.setReturnsCallback((returnCallback) -> {
            Message message = returnCallback.getMessage();
            String exchange = returnCallback.getExchange();
            int replyCode = returnCallback.getReplyCode();
            String routingKey = returnCallback.getRoutingKey();
            String replyText = returnCallback.getReplyText();
            if(rabbitPropertiesConfig.getDelayed_exchange().equals(exchange)){
                /**
                 * 使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法
                 * 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。
                 * 并非是BUG,而是有原因的,所以使用利用if去拦截这个异常,判断延迟队列交换机名称,然后break;
                 */
                log.info("如果是延迟队列那么break");
                return;
            }
            log.info("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" +
                    "==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");

            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange,
                    routingKey,replyCode,replyText,message);
            //todo 没有发送到指定的队列 数据暂存到数据库认定消费失败 再次重新上传
        });
    }

}


注意rabbitMq延迟队列需要安装插件,可参考官网

配置日志消息类

@Service
@Slf4j
public class MsgLogService {

    private final MsgLogMapper msgLogMapper;

    public MsgLogService(MsgLogMapper msgLogMapper) {
        this.msgLogMapper = msgLogMapper;
    }

    public void saveMsg(MsgLog msgLog){
        msgLogMapper.insert(msgLog);
    }

    public void updateStatus(String msgId, Integer status) {
        log.info("执行");
        msgLogMapper.updateStatus(msgId,status);
    }


    public MsgLog selectByMsgId(String msgId) {
        if (!ObjectUtils.isEmpty(msgId)){
            return msgLogMapper.seletMsgFormsgId(msgId);
        }
        return null;
    }


    public List<MsgLog> selectTimeoutMsg() {

        return msgLogMapper.selectTimeOutMsg();
    }


    public void updateTryCount(String msgId, Integer tryCount) {
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setTryCount(tryCount);
        msgLogMapper.updateByMsgId(msgLog);
    }

}

@Data
@NoArgsConstructor
public class MsgLog {

    private static final long serialVersionUID = 4990197789742500403L;
    private String msgId;

    private JSONObject msg;

    private String exchange;

    private String routingKey;

    private Integer status;

    private Integer tryCount;

    private String nextTryTime;

    private String createTime;

    private String updateTime;

    private String msgCase;


}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.cvdmp.dao.MsgLogMapper">

  <insert id="insert" parameterType="com.cvdmp.domain.entity.MsgLog">

    insert into msg_log
    <trim prefix="(" suffix=")" suffixOverrides=",">
      <if test="msgId != null">
        msg_id,
      </if>
      <if test="exchange != null">
        exchange,
      </if>
      <if test="routingKey != null">
        routing_key,
      </if>
      <if test="status != null">
        status,
      </if>
      <if test="tryCount != null">
        try_count,
      </if>
      <if test="nextTryTime != null">
        next_try_time,
      </if>
      <if test="createTime != null">
        create_time,
      </if>
      <if test="updateTime != null">
        update_time,
      </if>
      <if test="msg != null">
        msg,
      </if>
        <if test="msgCase!=null">
          msg_case,
        </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides=",">
      <if test="msgId != null">
        #{msgId,jdbcType=VARCHAR},
      </if>
      <if test="exchange != null">
        #{exchange,jdbcType=VARCHAR},
      </if>
      <if test="routingKey != null">
        #{routingKey,jdbcType=VARCHAR},
      </if>
      <if test="status != null">
        #{status,jdbcType=INTEGER},
      </if>
      <if test="tryCount != null">
        #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="nextTryTime != null">
        #{nextTryTime},
      </if>
      <if test="createTime != null">
        #{createTime},
      </if>
      <if test="updateTime != null">
        #{updateTime},
      </if>
      <if test="msg != null">
        #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler},
      </if>
        <if test="msgCase !=null">
          #{msgCase},
        </if>
    </trim>
  </insert>

  <update id="updateStatus" parameterType="map">
    update msg_log set status = #{status}, update_time = now()
    where msg_id = #{msgId}
  </update>

  <select id="selectTimeOutMsg" resultType="com.cvdmp.domain.entity.MsgLog">
    select
    *
    from msg_log
    where status = 0
    and next_try_time &lt;= now()
  </select>
  <select id="seletMsgFormsgId" parameterType="string" resultType="com.cvdmp.domain.entity.MsgLog">
    select
      *
    from msg_log
    where msg_id = #{msgId}
  </select>

  <update id="updateByMsgId" parameterType="com.cvdmp.domain.entity.MsgLog">

    update msg_log
    <set>
      <if test="exchange != null">
        exchange = #{exchange,jdbcType=VARCHAR},
      </if>
      <if test="routingKey != null">
        routing_key = #{routingKey,jdbcType=VARCHAR},
      </if>
      <if test="status != null">
        status = #{status,jdbcType=INTEGER},
      </if>
      <if test="tryCount != null">
        try_count = #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="nextTryTime != null">
        next_try_time = #{nextTryTime},
      </if>
      <if test="createTime != null">
        create_time = #{createTime},
      </if>
      <if test="updateTime != null">
        update_time = #{updateTime},
      </if>
      <if test="msg != null">
        msg = #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler},
      </if>
    </set>
    where msg_id = #{msgId,jdbcType=VARCHAR}
  </update>
</mapper>

最后我们定义生产者和消费者


/**
 * mq消息推送策略
 * 1、通过rabbitmq完成消息的推送保证消息推送成功
 * @author daizhihua
 * @time 2023/4/25
 */
@Component(value = "mqStrategy")
public class MqStrategyService {

    private final RabbitConfig rabbitConfig;

    private final MsgLogService msgLogService;

    public MqStrategyService(RabbitConfig rabbitConfig, MsgLogService msgLogService) {
        this.rabbitConfig = rabbitConfig;
        this.msgLogService = msgLogService;
   }


    public void sendMessage(JSONObject map, HttpServletRequest request) {
        RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig();
        String msgId = RandomUtil.getRandomNumber(32);
        //设置消息id
        map.put("msgId",msgId);
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setMsg(map);
        msgLog.setExchange(rabbitPropertiesConfig.getSmart_topic_exchange());
        msgLog.setRoutingKey(rabbitPropertiesConfig.getSmart_topic_routingKey());
        msgLog.setNextTryTime(DateUtil.getNow());
        msgLogService.saveMsg(msgLog);
        //生成消息的唯一id
        CorrelationData correlationData = new CorrelationData(msgId);
        RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate();
        // 发送消息
        rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getSmart_topic_exchange(),
                rabbitPropertiesConfig.getSmart_topic_routingKey(), map, correlationData);



    }

    /**
     * 发送延迟队列消息
     * @param map
     * @param delayTime
     */
    public void sendMessageDelay(JSONObject map,int delayTime){
        RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig();
        String msgId = RandomUtil.getRandomNumber(32);
        //设置消息id
        map.put("msgId",msgId);
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setMsg(map);
        msgLog.setExchange(rabbitPropertiesConfig.getDelayed_exchange());
        msgLog.setRoutingKey(rabbitPropertiesConfig.getDelayed_routingKey());
        msgLog.setNextTryTime(DateUtil.getNow());
        //生成消息的唯一id
        CorrelationData correlationData = new CorrelationData(msgId);
        RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate();
        rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getDelayed_exchange(),
                rabbitPropertiesConfig.getDelayed_routingKey(), map, message -> {
                    message.getMessageProperties().setDelay(delayTime);
                    return message;},correlationData);
    }

}

延迟队列的消费

@Slf4j
@Component
@RabbitListener(queues = "${com.acrdpm.delayed_queue}")
public class MessageConsumer {

    private final MqStrategyService mqStrategyService;

    public MessageConsumer(MqStrategyService mqStrategyService) {
        this.mqStrategyService = mqStrategyService;
    }

    @RabbitHandler
    public void consume(Message message, JSONObject map, Channel channel) throws IOException {
        System.out.println("First Queue received msg : " );
        log.info("数据是:{}",map);
        System.out.println(message);
        System.out.println(channel);
        long tag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(tag, false);

    }

}

订阅消息的消费者

@Slf4j
@Component
@RabbitListener(queues = {"${com.acrdpm.smart_topic_queue}"})
public class SmartConsumer {

    private MsgLogService msgLogService;


    @RabbitHandler
    public void consume(Message message, JSONObject mail, Channel channel) throws IOException {
        log.info("接收到消息了");
        log.info("消息 {}",message);
        log.info("收到的消息是:{}",mail);
        long tag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(tag, false);

        String msgId = mail.getMsgId();
        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        if (null == msgLog || msgLog.getStatus().equals(MsgLogStatus.CONSUMED_SUCCESS)) {
            // 消费幂等性:确定不是重复的消息:及消费完成的消息
            log.info("重复消费, msgId: {}", msgId);
            return;
        }
         //获取投送标签
        long tag = message.getMessageProperties().getDeliveryTag();
//        boolean success = false;
//        if (success) {
//            log.info("成功发送消息");
//            msgLogService.updateStatus(msgId, MsgLogStatus.CONSUMED_SUCCESS);
//            // 消费确认手动ack
//            channel.basicAck(tag, false);
//        } else {
//            channel.basicAck(tag, false);
//        }
//        try {
            boolean success = EmailUtil.sendEmail(mail);
//
//        } catch (EmailException e) {
//            log.error("email 发送异常" , e);
//        } catch (IOException e) {
//            log.error("消息处理异常" , e);
//        }

    }
}

在发送消息的过程中,肯定会出现网络异常等情况所以我们定义了发送消息的持久化,为了保证一致性,可参考如下时序图
springboot 整合rabbitMq保证消息一致性方案文章来源地址https://www.toymoban.com/news/detail-431738.html

到了这里,关于springboot 整合rabbitMq保证消息一致性方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ZooKeeper是如何保证数据一致性的?

             目录 一、分布式一致性原理 二、ZooKeeper架构         2.1 ZAB 协议操作顺序性         2.2 领导者选举         成员身份         成员状态         领导者选举 三、总结         在分布式系统里的多台服务器要对数据状态达成一致,其实是一件很有难度和挑

    2024年04月11日
    浏览(36)
  • HBase的事务处理与一致性保证

    HBase是一个分布式、可扩展、高性能的列式存储系统,基于Google的Bigtable设计。它是Hadoop生态系统的一部分,可以与HDFS、MapReduce、ZooKeeper等组件集成。HBase具有高可靠性、高性能和高可扩展性等特点,适用于大规模数据存储和实时数据处理。 在现实应用中,事务处理和一致性

    2024年02月20日
    浏览(38)
  • 聊聊 Kafka:Kafka 如何保证一致性

    在如今的分布式环境时代,任何一款中间件产品,大多都有一套机制去保证一致性的,Kafka 作为一个商业级消息中间件,消息一致性的重要性可想而知,那 Kafka 如何保证一致性的呢?本文从高水位更新机制、副本同步机制以及 Leader Epoch 几个方面去介绍 Kafka 是如何保证一致性

    2024年02月02日
    浏览(26)
  • MySQL是如何保证数据一致性的?

    通过上文《MySQL是如何保证数据不丢失的?》可以了解DML的操作流程以及数据的持久化机制。对于一个数据库而言,除了数据的持久性、不丢失之外,一致性也是非常重要的,不然这个数据是没有任何意义的。在使用MySQL时,数据不一致的情况也可能出现,所以,本文就来看看

    2024年02月03日
    浏览(45)
  • 使用双异步后,如何保证数据一致性?

    大家好,我是哪吒。 在上一篇文章中,我们 通过双异步的方式导入了10万行的Excel ,有个小伙伴在评论区问我, 如何保证插入后数据的一致性呢? 很简单,通过对比Excel文件行数和入库数量是否相等即可。 那么,如何获取异步线程的返回值呢? 我们可以通过给异步方法添加

    2024年01月23日
    浏览(49)
  • MySQL和Redis如何保证数据一致性

    MySQL与Redis都是常用的数据存储和缓存系统。为了提高应用程序的性能和可伸缩性,很多应用程序将MySQL和Redis一起使用,其中MySQL作为主要的持久存储,而Redis作为主要的缓存。在这种情况下,应用程序需要确保MySQL和Redis中的数据是同步的,以确保数据的一致性。 “数据一致

    2024年02月12日
    浏览(45)
  • MySQL和Redis如何保证数据一致性?

    由于缓存的高并发和高性能已经在各种项目中被广泛使用,在读取缓存这方面基本都是一致的,大概都是按照下图的流程进行操作: 但是在更新缓存方面,是更新完数据库再更新缓存还是直接删除缓存呢?又或者是先删除缓存再更新数据库?在这一点上就值得探讨了。 在实

    2024年02月01日
    浏览(43)
  • 如何在微服务下保证事务的一致性

    作者:京东科技 苗元 随着业务的快速发展、业务复杂度越来越高,传统单体应用逐渐暴露出了一些问题,例如开发效率低、可维护性差、架构扩展性差、部署不灵活、健壮性差等等。而微服务架构是将单个服务拆分成一系列小服务,且这些小服务都拥有独立的进程,彼此独

    2023年04月27日
    浏览(39)
  • 如何保证缓存和数据库的数据一致性

    若数据库更新成功,删除缓存操作失败,则此后读到的都是缓存中过期的数据,造成不一致问题。 同删除缓存策略一样,若数据库更新成功缓存更新失败则会造成数据不一致问题。 若缓存更新成功数据库更新失败, 则此后读到的都是未持久化的数据。因为缓存中的数据是易

    2023年04月19日
    浏览(42)
  • 怎么保证缓存与数据库的最终一致性?

    目录 零.读数据的标准操作 一.Cache aside Patten--旁路模式 二.Read/Write Through Pattern--读写穿透 三.Write Back Pattern--写回 四.运用canal监听mysql的binlog实现缓存同步 这里想说的是不管哪种模式读操作都是一样的,这是一种统一的规范: 但写操作和同步策略却有不同。 这个是最常见的

    2024年04月08日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包