Flink 检查点配置

这篇具有很好参考价值的文章主要介绍了Flink 检查点配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

启用检查点

开启自动保存快照 (默认:关闭) :

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);

间隔调整 :

  • 对性能的影响更小,就调大间隔时间
  • 为了更好的容错性,就以调小间隔时间

检查点存储

检查点存储 (CheckpointStorage) : 持久化存储位置

  • JobManager 的堆内存 (JobManagerCheckpointStorage) : 默认
  • 文件系统 (FileSystemCheckpointStorage) : 常用 , (HDFS , S3)
// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

通用增量

Rocksdb 状态后端 : 启用增量 checkpoint

  • Flink 1.15 后 , HashMap , Rocksdb 都能开启通用的增量 checkpoint
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

增量 checkpoint 过程 :

  1. 带状态的算子任务 , 将状态更改 , 写入变更日志(记录状态)
  2. 状态物化:状态表定期保存,独立于检查点
  3. 状态物化完成后,状态变更日志 , 就截断到相应的点

注意点 :

  • HDFS : 文件数变多
  • 上传变更日志 : IO 宽带较大
  • 序列化状态变更 : CPU 消耗较大
  • 缓存状态变更 : TaskManager 内存消耗较大
  • Checkpint 最大并发 = 1
  • Flink 1.15 , Memory 测试阶段
  • 不支持 NO_ClAIM 模式

配置文件 :

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem 

# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog 
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM

代码配置 :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-changelog</artifactId>
    <version>${flink.version}</version>
    <scope>runtime</scope>
</dependency>
// 开启changelog:
env.enableChangelogStateBackend(true);

最终检查点

当有界数据 , 部分Task 完成 , Flink 1.14 后 , 它们依然能进行检查点

禁用 (Flink 1.15 后, 默认启用) :文章来源地址https://www.toymoban.com/news/detail-495734.html

Configuration config = new Configuration();
// 禁用最终检查点
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

配置建议

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取所有配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();

// 检查点模式 (CheckpointingMode) : 
//   精确一次 : exactly-once (默认)
//   至少一次 : at-least-once (效率更高)
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 最大并发检查点数量(maxConcurrentCheckpoints): 
//  检查点最多有多少个
checkpointConfig.setMaxConcurrentCheckpoints(1)

// 启用非对齐的检查点保存
// 限制: CheckpointingMode= exctly-once , 并发的检查点 = 1
checkpointConfig.enableUnalignedCheckpoints();

// 默认: 0: 用非对齐的检查点
// > 0: 用 对齐的检查点(barrier对齐)
// 当对齐时间 > 阈值, 为: 非对齐检查点(barrier非对齐)
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));

// 超时时间 (checkpointTimeout) : 
//  检查点保存的超时时间,当超时就丢弃
//  单位 : 长整型毫秒数
checkpointConfig.setCheckpointTimeout(60000)

//最小间隔时间 (minPauseBetweenCheckpoints): 
//   上个 checkpoint 完成后, 最快多久触发另个 checkpoint
checkpointConfig.setMinPauseBetweenCheckpoints(500)

// 开启检查点的外部持久化
//   DELETE_ON_CANCELLATION: 作业取消时, 自动删除外部检查点,但作业失败退出,就保留检查点
//   RETAIN_ON_CANCELLATION:作业取消时, 也保留外部检查点
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)


// 检查点异常时, 是否整个任务失败
//   true : 失败提出
//   false: 丢弃, 并继续运行
checkpointConfig.setFailOnCheckpointingErrors(true)

到了这里,关于Flink 检查点配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 怎么理解flink的异步检查点机制

    flink的checkpoint监控页面那里有两个指标Sync Duration 和Async Duration,一个是开始进行同步checkpoint所需的时间,一个是异步checkpoint过程所需的时间,你是否也有过疑惑,是否只是同步过程中的时间才会阻塞正常的数据处理,而异步checkpoint的时间不会影响正常的数据处理流程? 这

    2024年02月09日
    浏览(61)
  • Flink系列之:背压下的检查点

    通常情况下,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响。 然而,当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间。这在 checkpointing process) 的概述中有说明原因

    2024年02月04日
    浏览(45)
  • 【大数据】Flink 架构(五):检查点 Checkpoint(看完即懂)

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月19日
    浏览(47)
  • 【Flink】Flink 记录一个 checkpoint 检查点 越来越大的问题

    Flink SQL checkpoint越来越大咋么办,从2个G,现在4个G了,增量同步的,窗口是1小时,watermark是6小时,按道理来说,数据量不应该越来越大啊? 在窗口内执行了count(distinct )这些操作。设置了状态的ttl。后端状态存储用的rocksdb。 状态如下 设置了增量的检查点 代码设置不一定有

    2024年02月10日
    浏览(50)
  • Flink任务失败,检查点失效:Exceeded checkpoint tolerable failure threshold.

    最近实时平台flink任务频繁失败,报检查点方面的错误,最近集群的hdfs也经常报警:运行状况不良,不知道是否和该情况有关,我的状态后端位置是hdfs,废话不多说,干货搞起来~ 日志中报错如下: 在报 Exceeded checkpoint tolerable failure threshold. 错误的之前,是先报的是 Checkpoi

    2024年02月07日
    浏览(84)
  • 209.Flink(四):状态,按键分区,算子状态,状态后端。容错机制,检查点,保存点。状态一致性。flink与kafka整合

    算子任务可以分为有状态、无状态两种。 无状态:filter,map这种,每次都是独立事件 有状态:sum这种,每次处理数据需要额外一个状态值来辅助。这个额外的值就叫“状态” (1)托管状态(Managed State)和原始状态(Raw State) 托管状态 就是由Flink统一管理的,状态的存储访问

    2024年02月06日
    浏览(53)
  • Spark 检查点(checkpoint)

    Checkpointing可以将RDD从其依赖关系中抽出来,保存到可靠的存储系统(例如HDFS,S3等), 即它可以将数据和元数据保存到检查指向目录中。 因此,在程序发生崩溃的时候,Spark可以恢复此数据,并从停止的任何地方开始。 Checkpointing分为两类: 高可用checkpointing,容错性优先。这

    2024年04月27日
    浏览(43)
  • loadrunner入门教程(14)--检查点

    检查点函数原理:回放脚本时搜索特定的文本或者字符串,从而验证服务器相应的正确性;验证请求是否成功,可以添加检查点。以检查从服务器返回的内容是否正确。本任务针对脚本开发–检查点进行介绍 掌握基于loadrunner性能测试脚本开发——检查点 1.单击Design→Insert

    2024年02月05日
    浏览(67)
  • SPARK--cache(缓存)和checkpoint检查点机制

    rdd的特性 缓存和checkpoint 作用都是进行容错 rdd在计算是会有多个依赖,为了避免计算错误是从头开始计算,可以将中间* 依赖rdd进行缓存或checkpoint 缓存或checkpoint也叫作rdd的持久化 一般对某个计算特别复杂的rdd进行持久化 缓存使用 缓存是将数据存储在内存或者磁盘上,缓存

    2024年01月16日
    浏览(58)
  • Spark基础学习笔记----RDD检查点与共享变量

    了解RDD容错机制 理解RDD检查点机制的特点与用处 理解共享变量的类别、特点与使用 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是 血统(Lineage)方式 和 设置检查点(checkpoint)

    2024年02月06日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包