springKafka 重试解决分布式事务

这篇具有很好参考价值的文章主要介绍了springKafka 重试解决分布式事务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1.背景

1.1 名词解释、

1.2 业务场景

1.3 kafka消息的优点和缺点

1.4 kafka客户端重试框架

2.使用

2.1 引入pom依赖

2.2 定义重试消息,死信队列

2.3 业务执行异常处理

3.代码分析

3.1 服务启动扫描配置

3.2 消费消息并重新投递

3.3 控制消息重试频率及死信队列

3.4 控制消息的重试时间


1.背景

1.1 名词解释、

名词

概念

事务

在后端应用中,是访问和更新数据库的程序执行单元,会把所有的命令作为一个整体,即这一组执行命令要么都执行成功,要么都不执行。

分布式事务

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

幂等

指对一个业务操作,多次执行,带来的效果一样。

死信队列

无法被消费的消息或者重试消费执行都失败被放在一个指定的队列

1.2 业务场景

近期项目需要解决分布式事务的问题,当时讨论了多次,决定采用事务的最终一致性,但是我们由于限制mq只有mqtt和kafka。所以使用kafka消息重试去解决问题,

   以下是当时的方案 图

kafka重试队列,分布式,kafka,java

1.3 kafka消息的优点和缺点

       kafka擅长解决大批量的消息推送,高吞吐量,比较高的数据可靠性(一定策略下可以保证消息不丢失问题)。

但是很遗憾,kafka中间件并不如市面上擅长处理业务的消息中间件如rocketmq,nsq一样可以控制消息的频率,重试次数等功能。

那么我们只需要解决消息重试机制(重试次数,重试间隔,重试策略,异常通知)等问题

1.4 kafka客户端重试框架

      当我准备参考了以前公司2018年中间件团队的代码准备写一个重试机制的kafka客户端框架的时候,发现并不那么简单。当时的代码只是捕获到异常后丢到了原来的topic里面,只有简单的重试次数功能,

缺点

1.丢到原来的topic,可能导致重试topic阻塞到原来的业务执行。

2.无法精确控制到重试时间,超过重试次数后的死信队列没有

3.控制次数放到了内存里面,系统重启将失去重试机会(对于重要的业务失去重试机会有可能代表这整条业务单据无法使用,甚至造成重大资金损失)

偶然的机遇发现springkafka客户端在2021年的时候居然实现了这个功能,于是看了看官方的@retryTopick框架代码,发现spring为了写这个也是下了大的功夫,这个模块代码量也是惊人,代码复杂度自己断点都看晕了。

2.使用

2.1 引入pom依赖

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2.2 定义重试消息,死信队列

/**
 * @author zhongxingtang
 * @desc kafka消息接受工具
 */
@Slf4j
@Component
public class KafkaMQListener {

    @KafkaListener(topics = {"BMOS_TOP_TEST"}, groupId = "${spring.application.name}")
    @RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 5000, multiplier = 2),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
    )
    /**
     * attempts:重试次数,默认为3。
     *
     * @Backoff delay:消费延迟时间,单位为毫秒。
     *
     * @Backoff multiplier:延迟时间系数,此例中 attempts = 4, delay = 5000, multiplier = 2 ,则间隔时间依次为5s、10s、20s、40s,最大延迟时间受 maxDelay 限制。
     *
     * fixedDelayTopicStrategy:可选策略包括:SINGLE_TOPIC 、MULTIPLE_TOPICS
     */
    public void listener(ConsumerRecord<String, String> record) {
        // 获取消息体
        String message = record.value();
        log.info("收到测试消息,message = {}, record={}", message, JSONObject.toJSONString(record));
        // todo  写重试业务消息
    }

    @DltHandler
    public void dltHandler(ConsumerRecord<String, String> record) {
        log.info("进入死信队列 record={}", JSONObject.toJSONString(record));
        // todo  写重试多次失败后的业务,例如短信,报警,存数据库等。
    }

2.3 业务执行异常处理

    public ResponseInfo<Boolean> startFormulaWorkFlow(@Validated @RequestBody WorkflowStartDTO dto) {
        try {
            // 执行可重试业务异常
            workflowService.startFormulaWorkflow(dto);
        } catch (Exception e) {
            log.warn("计划执行-发起流程失败重试,dto = {},  e = {}", JsonUtils.toJsonString(dto), e);
            //捕捉异常,丢到kafka消息里面,进行重试
            kafkaMQProducer.send(WORKFLOW_FORMULA_WORKFLOW, JsonUtils.toJsonString(dto));
        }
        return ResponseInfo.SUCCESS();
    }

总结:简单,无脑,粗暴,好用。

3.代码分析

客户端消息如何重试?是自己封装的消息队列重试还是丢到了kafka服务端重试呢

如何控制消息的重试时间和频率,进入死信队列

(kafka不会犯这么低级的错误,简单的只维护一个本地延时队列来跑,否则服务断开,消息全丢失了)

3.1 服务启动扫描配置

3.1.1 后置处理器扫描注解

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

后置处理器扫描到带有@KafkaListener、, @RetryableTopic注解

kafka重试队列,分布式,kafka,java

kafka重试队列,分布式,kafka,java

3.1.2 解析重试参数,并放入上下文中

org.springframework.kafka.retrytopic.RetryTopicConfigurer

kafka重试队列,分布式,kafka,java

3.2 消费消息并重新投递

链路太长,只贴重要的图

org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#run

org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#pollAndInvoke

KafkaMessageListenerContainer.ListenerConsumer#invokeIfHaveRecords

KafkaMessageListenerContainer.ListenerConsumer#doInvokeWithRecords

KafkaMessageListenerContainer.ListenerConsumer#doInvokeRecordListener

3.2.1执行消息消费,捕获异常

kafka重试队列,分布式,kafka,java

org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#invokeErrorHandler

org.springframework.kafka.listener.DefaultErrorHandler#handleRemaining

org.springframework.kafka.listener.FailedRecordProcessor#getRecoveryStrategy

org.springframework.kafka.listener.FailedRecordTracker#recovered

org.springframework.kafka.listener.FailedRecordTracker#attemptRecovery

org.springframework.kafka.listener.DeadLetterPublishingRecoverer#accept

3.2.2 捕获异常,重新投递消息。

kafka重试队列,分布式,kafka,java

3.2.3 组装第二次投递消息参数,send消息

kafka重试队列,分布式,kafka,java

3.3 控制消息重试频率及死信队列

3.3.1控制重试次数配置

消息重试次数配置到了@RetryableTopic,attempts = "5"里面

kafka重试队列,分布式,kafka,java

3.3.2 重试次数初始化信息

按照正常思维,一般人都是考虑一次一次重试,记录次数直到到达最大次数进入死信队列

可以这个地方断点了无数次,也没找到在哪判断,最终在启动容器的时候发现提前创建了一个链表结构,

topic100->topic200->topic500 .....>top-dtl(死信队列)

(spring为了提升执行速度快真是拼了命了,有个大胆的想法,写个1000万重试次数,内存都要爆炸了吧)

典型的极致空间换时间

初始化context,记录每一个节点信息,通过直到next的节点为空代表重试次数结束(或者不再抛异常中断)

kafka重试队列,分布式,kafka,java

3.3.3 获取下一次重试节点

org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver#resolveDestinationTopic

org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver#resolveRetryDestination

retrytopic.DefaultDestinationTopicResolver.DestinationTopicHolder#getNextDestination

kafka重试队列,分布式,kafka,java

链表结构

kafka重试队列,分布式,kafka,java

节点具体信息

kafka重试队列,分布式,kafka,java

3.3.4 组装下一次重试的topic信息

每次在trycatch 捕获异常后,根据当前链表获取下一个节点的信息。放入下一个重试消息的topic_head里面做标记,

kafka重试队列,分布式,kafka,java

3.3.4 dlt 死信队列

消息消费-dlt 的topic,执行死信队列的自定义业务

kafka重试队列,分布式,kafka,java

3.4 控制消息的重试时间

kafka服务起是顺序存储到磁盘里面,不会带提供阻塞功能,很明显对号称史上吞吐量最大的消息服务器,服务器怎么可能提供这个功能来牺牲自己的性能呢?

kafka通过不断的拉取最新分区的消息,当判断当前执行时间<想要执行的时间判断,不满足的情况下,记录信息,并抛出异常中断。

3.4.1 设置下一个topic的预期执行时间

org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory#create

org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory#addHeaders

kafka.retrytopic.DeadLetterPublishingRecovererFactory#getNextExecutionTimestamp

kafka重试队列,分布式,kafka,java

3.4.2 将预期执行时间设置到head里面

org.springframework.kafka.listener.DeadLetterPublishingRecoverer#sendOrThrow

kafka重试队列,分布式,kafka,java

3.4.3 循环消费消息,判断是否到达预期时间

org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#run

kafka重试队列,分布式,kafka,java

从head里面获取预期执行时间,判断是否到达预期执行时间

org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter#onMessage

kafka重试队列,分布式,kafka,java

3.4.4 未到达预期消费时间,打断消费。

当判断当前执行时间<想要执行的时间判断,不满足的情况下,记录信息,并抛出异常中断。

org.springframework.kafka.listener.PartitionPausingBackoffManager#backOffIfNecessary

kafka重试队列,分布式,kafka,java

3.4.5 持续消费消息并判断,直到满足预期消费时间

循环下一次,直到当前执行时间>=想要执行的时间,放行执行业务逻辑

listener.adapter.KafkaBackoffAwareMessageListenerAdapter#invokeDelegateOnMessage

org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter#onMessage

org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter#invokeHandler

org.springframework.kafka.listener.adapter.HandlerAdapter#invoke

com.isyscore.bmos.formula.mq.KafkaMQListener#listener

kafka重试队列,分布式,kafka,java文章来源地址https://www.toymoban.com/news/detail-758999.html

到了这里,关于springKafka 重试解决分布式事务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Seata -- 一款开源的分布式事务解决方案

    1.1 分布式事务简介 (1)事务 事务可以看做是一次大的活动,它由不同的小活动组成,这些活动要么全部成功,要么全部失败。 (2)本地事务 ​ 在计算机系统中,更多的是通过关系型数据库来控制事务,这是利用数据库本身的事务特性来实现的,因此叫数据库事务,由于

    2024年03月17日
    浏览(46)
  • 分布式事务解决方案Seata(1.6.1)下载与基础配置

    seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 基于支持本地 ACID 事务的关系型数据库。 Java 应用,通过 JDBC 访问数据库。 两阶段提交协议的

    2023年04月12日
    浏览(39)
  • OpenSource - 分布式重试平台

    在当前广泛流行的分布式系统中,确保系统数据的一致性和正确性是一项重大挑战。为了解决分布式事务问题,涌现了许多理论和业务实践,其中BASE理论是目前业界广泛接受的分布式一致性理论。 基于BASE理论,采用柔性事务并优先保障系统的可用性和数据的最终一致性已逐

    2024年02月14日
    浏览(28)
  • springcloud3 分布式事务解决方案seata之AT模式5

    1.XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源; 2.XA模式依赖数据库机制实现回滚;AT模式利用数据库快照实现数据回滚 3.XA模式强一致;AT模式最终一致。 1.2 AT模式原理 一阶段: 1.TM发起并注册全局事务到TC; 2.TM调用分支事务; 3.RM进行注册分支

    2024年02月07日
    浏览(38)
  • RabbitMQ——解决分布式事务问题,RabbitMQ的重要作用之一!!!通过可靠生产和可靠消费来完美解决!

    分布式事务是指涉及多个独立的计算机系统(也称为节点或参与者)之间的事务处理。在分布式系统中,每个节点可能各自拥有自己的数据存储和事务管理机制。分布式事务的目标是保证在跨多个节点执行的一系列操作可以以一致和可靠的方式执行和提交,即使在面对故障或

    2024年04月23日
    浏览(38)
  • 【项目亮点】大厂中分布式事务的最佳实践 问题产生->难点与权衡(偏爱Saga)->解决方案

    不断有同学问我大厂中实践分布式事务的问题,这里从 分布式事务的产生 ,到 强弱一致性与性能的权衡 ,再到最终 落地的解决方案 ,再到 实际的代码实现 ,再到我工作中实际 使用SAGA模式的应用案例 ,一篇文章讲清楚. 83.7%分布式事务的产生都是因为拆分微服务导致 的: 一句话概

    2024年04月27日
    浏览(33)
  • SpringCloud Alibaba - Seata 四种分布式事务解决方案(XA、AT)+ 实践部署(上)

    目录 一、Seata 分布式事务解决方案 1.1、XA 模式 1.1.1、XA模式理论 第一阶段: 第二阶段: 1.1.2、Seata 框架中的 XA 模式 第一阶段: 第二阶段: 1.1.3、XA 模式的优缺点 1.2.4、实现Seata 的 XA 模式 a)修改 application 文件(每一个参与事务的微服务) b)给发起全局事务中的入口方法

    2024年04月14日
    浏览(35)
  • 分布式重试服务平台 Easy-Retry

      在介绍这款开源产品前先给大家介绍一个开源组织:aizuda–爱组搭   可以看到Easy-Retry就是爱组搭的开源项目之一。   在分布式系统大行其道的当前,系统数据的准确性和正确性是重大的挑战,基于CAP理论,采用柔性事务,保障系统可用性以及数据的最终一致性成为

    2024年02月09日
    浏览(32)
  • 分布式:一文吃透分布式事务和seata事务

    什么是事务 事务是并发控制的单位,是用户定义的一个操作序列。 事务特性 原子性(Atomicity): 事务是数据库的逻辑工作单位,事务中包括的诸操作要么全做,要么全不做。 一致性(Consistency): 事务执行的结果必须是使数据库从一个一致性状态变到另一个一致性状态。一致性

    2024年02月07日
    浏览(48)
  • 【分布式】分布式事务:2PC

    分布式事务的问题可以分为两部分: 并发控制 concurrency control 原子提交 atomic commit 分布式事务问题的产生场景:一份数据被分片存在多台服务器上,那么每次事务处理都涉及到了多台机器。 可序列化(并发控制): 定义了事务执行的正确性 真正地并行执行事务,获得真正的

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包