Kafka重复消费、Dubbo重复调用问题排查

这篇具有很好参考价值的文章主要介绍了Kafka重复消费、Dubbo重复调用问题排查。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、业务流程简述:

        本业务为车机流量充值业务,大致流程为:收到微信、支付宝端用户支付成功回调后,将用户订单信息发送至kafka中;消费者接收到kafka中信息后进行解析,处理用户订单信息,为用户订购相关流量包(调用电信相关接口),订购成功/失败后会通过MQTT发送订购成功/失败消息至车机端,若订购失败则为用户退款。

 二、线上问题

8/22 收到400反馈称用户于 8/21购买的流量包未到账

 三、问题排查

  1. 首先排查用户订单记录及订单状态

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

        可以看到用户在同一时间有两条处理失败的订单,查看代码后发现此订单记录有两处场景会创建:

           1、调用电信套餐订购接口时会根据调用结果创建对应订单记录;

           2、调用自身套餐订购服务失败后会创建对应订单记录。

        看了下订单记录生成逻辑后,我有点蚌埠住了:在自身套餐订购服务内容内调用了电信套餐订购接口并根据订购结果存储对应订单记录后,还要在外层自身订购服务失败后存储一个失败订单,如果订单处理失败的话岂不是最少都有两条失败订单……

        但是我仔细看了下发现这两处创建的订单参数有些许不同,查看数据后发现果然没有这么简单……(竟然有三条订单记录,由于第二种创建的失败订单会有部分参数缺失,页面上不会查询到这类订单)

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

        其实此时根据订单记录和订单生成逻辑大致可以确定肯定套餐订购服务走了两次,我们查看下服务日志:

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

        org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

        确实流量订购失败了,但是二者时间间隔竟然长达40s,而这个项目是用的dubbo服务,查看项目配置后发现设置的调用超时时间为30s,重试次数为2;显然已经超过这个超时时间导致了套餐订购服务重复调用;解决方法也很简单:注解中不允许重复调用即可

@Service(version = "${provider.service.version}", retries = 0)

        到此,可以解决套餐重复订购问题。但是排查服务log时发现这条数据被消费了两次只是因为代码中处理了payLog的状态,而使得第二次消费时没有执行其他逻辑

        kafka消费log

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

        纳尼,本着对前同事代码极其相信的想法首先想到的是:是不是这款订单微信端回调时调用了两次导致往kafka中发了两条重复数据呢?Ok继续排查发送往kafka中发消息的相关日志:显然支付回调这块逻辑没有问题,只提交了一条消息。

        kafka生产者log

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo        现在可以确定问题为消费者重复消费,进一步查看订单处理逻辑发现流量订购失败时会有退款操作,但目前用户流量未到账且未退款,猜测应该是充值失败后服务某处出现异常导致消费者没有正确提交位移而引起重复消费;

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

        查看项目错误日志发现以下信息:

2023-08-21 18:34:46,264 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#3-0-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java : 149] Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.coocaa.client.web.kafka.KafkaConsumer.orderNotify(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.lang.Exception' threw exception; nested exception is java.net.SocketTimeoutException: connect timed out; nested exception is java.net.SocketTimeoutException: connect timed out
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208)
        at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:135)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2707)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2588)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2457)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2335)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2006)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1375)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1366)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1257)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

         果不其然,项目中有超时信息具体在MQTT推送消息中,有两处推送服务是因为百度MQTT的新、旧两个版本(此时大致可以确定是之前新老版本同时推送,现在百度不再维护老版本的mqtt引起服务异常)

   mnoIoTPushService.pushSinglePayMessageAction(payLog.getVin(), findOneTimelyResult);
   mnoIoTPushServiceV2.pushSinglePayMessageAction(payLog.getVin(), findOneTimelyResult);

        进一步查看推送服务代码,老版本是使用OkHttpClient(默认超时时间10_000)建立连接,log中是于2023-08-21 18:34:35,656 上报流量订购失败  正常应立马向车机端推送MQTT消息,此处于2023-08-21 18:34:46,264上报连接超时,与OkHttp的10s超时时间基本一致。

org.springframework.kafka.kafkaexception: seek to current after exception; n,kafka,分布式,dubbo

 至此问题排查完毕:文章来源地址https://www.toymoban.com/news/detail-842909.html

  1. 代码逻辑bug,所有订购失败的流量包都会重复生成失败订单;
  2. dubbo设置问题,调用外部接口超过项目设置的dubbo超时时间导致服务重复调用;
  3. 旧代码未及时迭代导致kafka消费者消费异常中断,没有正确提交位移引起重复消费

到了这里,关于Kafka重复消费、Dubbo重复调用问题排查的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 消费Kafka每日不定时积压(非重启不能解决)问题排查解决

    1. 背景         接手了一个问题排查的工作,有个Flink任务每天不定时会出现数据积压,无论是白天还是数据量很少的夜里,且积压的数据量会越来越多,得不到缓解,只能每日在积压告警后重启,重启之后消费能力一点毛病没有,积压迅速缓解,然而,问题会周而复始的

    2024年02月09日
    浏览(19)
  • JAVA面试题分享一百六十二:Kafka消息重复消费问题?

    消息重复消费的根本原因都在于:已经消费了数据,但是offset没有成功提交。 其中很大一部分原因在于发生了再均衡。 1)消费者宕机、重启等。导致消息已经消费但是没有提交offset。 2)消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生

    2024年02月03日
    浏览(27)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(26)
  • Dubbo之消费端服务RPC调用

    在消费端服务是基于接口调用Provider端提供的服务,所以在消费端并没有服务公共接口的实现类。 利用注解@DubboReference将目标接口CountryService作为CountryController类的字段属性,在解析类CountryController时获取全部字段属性并单独关注解析存在注解@DubboReference的字段属性。 通过步骤

    2024年03月12日
    浏览(26)
  • dubbo泛化调用之消费者传递JavaBean

    一、泛化调用概念 泛化调用是指在调用方没有服务方提供的 API(SDK)的情况下,对服务方进行调用,并且可以正常拿到调用结果。 二、使用场景 泛化调用主要用于实现一个通用的远程服务 Mock 框架,可通过实现 GenericService 接口处理所有服务请求。比如如下场景: 网关服务

    2024年02月09日
    浏览(28)
  • kafka怎么避免重复消费

    首先,Kafka Broker上存储的消息都有一个Offset的标记,然后Kafka的消费者是通过Offset这个标记来维护当前已经消费的一个数据的。消费者每消费一批数据,Kafka Broker就会更新OffSet的一个值,避免重复消费的一个问题。 默认情况下,消息消费完成以后,会自动提交Offset这样一个值

    2024年04月15日
    浏览(22)
  • kafka如何避免消息重复消费

    Kafka 避免消息重复消费通常依赖于以下策略和机制: Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。 Kafka会记录每个消费者组消

    2024年01月15日
    浏览(21)
  • 从源码全面解析 dubbo 消费端服务调用的来龙去脉

    👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦

    2024年02月09日
    浏览(19)
  • Kafka未触发消费异常排查实录

        最近生产环境系统发现一个疑难杂症,看了很久的问题但是始终无法定位到问题并处理,然后查阅了相关资料也是定位不到问题,不过资料查阅却给了个新的思路,以此为跳板最终解决了问题。     功能介绍: “主计划拆分子计划”是APS系统很常见的功能,功能大概

    2024年02月05日
    浏览(29)
  • 记一次Kafka重复消费解决过程

            起因:车联网项目开发,车辆发生故障需要给三个系统推送消息,故障上报较为频繁,所以为了不阻塞主流程,采用了使用kafka。消费方负责推送并保存推送记录,但在一次压测中发现,实际只发生了10次故障,但是推送记录却有30多条。         问题排查,发现

    2024年02月13日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包