Flink|checkpoint 超时报错问题处理(FlinkRuntimeException)

这篇具有很好参考价值的文章主要介绍了Flink|checkpoint 超时报错问题处理(FlinkRuntimeException)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题出现场景

为了评估一个 Flink 程序的处理效果,我使用本地模式启动了 Flink 程序,并在上游表中一次性插入了大量数据(大概相当于线上单个并发 4 - 5 分钟的最大处理量),以触发计算。

但是,在本地计算中,一直无法计算完成,观察后发现任务在被重复计算,进而发现 Flink 在不断从 checkpoint 恢复。在日志中搜索 checkpoint,发现如下报错信息:

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1934) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1906) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:96) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1990) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_391]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_391]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_391]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_391]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_391]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_391]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_391]

该报错大概每 10 分钟左右出现一次,具体如下:

  • 11:30:44 - 启动 Flink 程序
  • 11:41:17 - 第一次报错(在启动后 10 分钟 33 秒)
  • 11:51:48 - 第二次报错(在上次报错后 10 分钟 31 秒)
  • 12:02:19 - 第三次报错(在上次报错后 10 分钟 31 秒)
  • ……

问题原因

通过定位,发现是 checkpoint 超时报错。Flink 的 checkpoint 的超时时间时 600 秒,但是这个任务需要 11 分钟才能完成。本地之所以比线上慢,一方面是因为本地增加了一部分新的逻辑;另一方面也可能是因为线上运行时,对 MySQL 请求时走的是内网请求,而本地运行走的是外网请求。

Flink 的 checkpoint 超时时间默认值的源码位置如下:

// org.apache.flink.streaming.api.environment.CheckpointConfig

public class CheckpointConfig implements Serializable {
 public CheckpointConfig() {
     this.checkpointingMode = DEFAULT_MODE;
     this.checkpointInterval = -1L;
     this.checkpointTimeout = 600000L;
     this.minPauseBetweenCheckpoints = 0L;
     this.maxConcurrentCheckpoints = 1;
     this.alignmentTimeout = (Duration)ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
     this.failOnCheckpointingErrors = true;
     this.preferCheckpointForRecovery = false;
     this.tolerableCheckpointFailureNumber = -1;
 }

解决方法

在本地测试时,如需较大数据量测试,显式地设置 checkpoint 超时时间即可:文章来源地址https://www.toymoban.com/news/detail-860868.html

env.getCheckpointConfig().setCheckpointTimeout(1200000);

到了这里,关于Flink|checkpoint 超时报错问题处理(FlinkRuntimeException)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink】FlinkRuntimeException: Cannot read the binlog filename and position via ‘SHOW MASTER STATUS‘

    错误明细: Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation 访问被拒绝;此操作需要(至少一个)SUPER、REPLICATION CLIENT权限 权限不够呗: 查看官网文档: https://ververica.github.io/flink-cdc-connectors/master/content/about.html 创建 MySQL 用户: 向用户授予所

    2024年02月02日
    浏览(30)
  • Flink非对齐checkpoint原理(Flink Unaligned Checkpoint)

    为什么提出Unaligned Checkpoint(UC)? 因为反压严重时会导致Checkpoint失败,可能导致如下问题 恢复时间长-服务效率低 非幂等和非事务会导致数据重复 持续反压导致任务加入死循环(可能导致数据丢失,例如超过kafka的过期时间无法重置offset) UC的原理 UC有两个阶段(UC主要是

    2024年02月14日
    浏览(39)
  • 【flink】Checkpoint expired before completing. 使用flink同步数据出现错误Checkpoint expired before completing.

    任务超时了: 重新把任务配置参数,配置如下: 或者修改 flink的 配置文件flink-conf.yaml 

    2024年02月12日
    浏览(33)
  • Flink本地checkpoint测试

    在本地IDEA测试中,使用本地文件存储系统,作为checkpoint的存储系统,将备份数据存储到本地文件中,作业停止后,从本地备份数据启动Flink程序。 主要分为两步: 1)备份数据 2)从备份数据启动 备份数据的配置,和使用HDFS文件体统类似,只不过路径填写成本地文件系统的

    2024年02月15日
    浏览(34)
  • Flink: checkPoint

    依据1.17.1 最新版本的内容研究下期运作原理,总的来说其实就是设置一些参数,这些参数就会影响到如何存储checkpoint的问题.用起来没什么难的,参数配置的组合到是挺多cuiyaonan2000@163.com 参考资料: Checkpointing | Apache Flink State Backends | Apache Flink Flink 中的每个方法或算子都能够是 有

    2024年02月12日
    浏览(26)
  • Flink 学习八 Flink 容错机制 & checkpoint & savepoint

    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/ 上一节讲述 状态后端 ;Flink是一个 带状态stateful 的数据处理系统,在处理数据的过程中,各个算子的记录的状态会随着算子处理的状态而改变 ; 状态后端 负责将状态保存在内存或外部持久化存储中 (内存

    2024年02月09日
    浏览(70)
  • 【Flink入门修炼】2-3 Flink Checkpoint 原理机制

    如果让你来做一个有状态流式应用的故障恢复,你会如何来做呢? 单机和多机会遇到什么不同的问题? Flink Checkpoint 是做什么用的?原理是什么? Checkpoint 是对当前运行状态的完整记录。程序重启后能从 Checkpoint 中恢复出输入数据读取到哪了,各个算子原来的状态是什么,并

    2024年04月25日
    浏览(33)
  • Flink源码之Checkpoint执行流程

    Checkpoint完整流程如上图所示: JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint SourceTask向下游广播CheckpointBarrier SouceTask完成状态快照后向JobMaster发送快照结果 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果 JobMaster保存SubTask快照结果 JobMaster收到所

    2024年02月11日
    浏览(29)
  • 9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月04日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包