Flink: checkPoint

这篇具有很好参考价值的文章主要介绍了Flink: checkPoint。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

序言

依据1.17.1 最新版本的内容研究下期运作原理,总的来说其实就是设置一些参数,这些参数就会影响到如何存储checkpoint的问题.用起来没什么难的,参数配置的组合到是挺多cuiyaonan2000@163.com

参考资料:

  1. Checkpointing | Apache Flink
  2. State Backends | Apache Flink

Checkpointing #

Flink 中的每个方法或算子都能够是有状态的(阅读 working with state 了解更多)。 状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。 为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。

容错文档 中介绍了 Flink 流计算容错机制内部的技术原理。

Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:

  • 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
  • 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。

开启与配置 Checkpoint #

默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。----这个就是checkpoint的间隔时间,没有最小时间好

另外一种是通过TableEnvironment 中的conf来开启checkpoint

          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
          // TableEnvironment tableEnv = TableEnvironment.create(settings);
          Configuration configuration = tableEnv.getConfig().getConfiguration();
          configuration.setString("table.exec.mini-batch.enabled", "true");
          configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
          configuration.setString("table.exec.mini-batch.size", "50");
          configuration.setString("table.dml-sync", "false");
          configuration.setString("execution.checkpointing.interval", "3s");
这种方式可以设置更多的属性

Checkpoint 其他的属性包括:

  • 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。

  • checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。---checkpoint超时就放弃上一个checkpoint,去执行下一个,这样子有可能永远都不会执行成功,因为每一个都超时cuiyaonan2000@163.com

  • checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。

    往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。----最小间隔貌似不会让 checkpoint间隔失效,而是根据情况进行二选一,最小间隔不会关注checkpoin的执行时间,及时超时了,也不会被抛弃cuiyaonan2000@163.com

    注意这个值也意味着并发 checkpoint 的数目是

  • checkpoint 可容忍连续失败次数该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 可容忍的checkpoint失败仅适用于下列情形:Job Manager的IOException,TaskManager做checkpoint时异步部分的失败, checkpoint超时等。TaskManager做checkpoint时同步部分的失败会直接触发作业fail over。其它的checkpoint失败(如一个checkpoint被另一个checkpoint包含)会被忽略掉。

  • 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。

    该选项不能和 “checkpoints 间的最小时间"同时使用。

  • externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。

设置如上的配置代码示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();

State Backend #

状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend

Flink 的 checkpointing 机制 会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及任何用户自定义的状态。 Checkpoint 存储在哪里取决于所配置的 State Backend(比如 JobManager memory、 file system、 database)。

默认情况下,状态是保持在 TaskManagers 的内存中checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。通过 StreamExecutionEnvironment.setStateBackend(…) 来配置所选的 state backends。

内置的 State Backends #

Flink 内置了以下这些开箱即用的 state backends :

  • HashMapStateBackend
  • EmbeddedRocksDBStateBackend

如果不设置,默认使用 HashMapStateBackend

HashMapStateBackend #

HashMapStateBackend 是非常快的,因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上

在 HashMapStateBackend 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。

HashMapStateBackend 的适用场景:

  • 有较大 state,较长 window 和较大 key/value 状态的 Job。
  • 所有的高可用场景。

建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。

与 EmbeddedRocksDBStateBackend 不同的是,由于 HashMapStateBackend 将数据以对象形式存储在堆中,因此重用这些对象数据是不安全的。

EmbeddedRocksDBStateBackend #

RocksDB 可以根据可用的 disk 空间扩展,并且只有它支持增量 snapshot。

EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 不同于 HashMapStateBackend 中的 java 对象,数据被以序列化字节数组的方式存储,这种方式由序列化器决定,因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的 hashCode 或 equals() 方法。

EmbeddedRocksDBStateBackend 的适用场景:

  • 状态非常大、窗口非常长、key/value 状态非常大的 Job。
  • 所有高可用的场景。

EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见 这里)。

EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。 同时因为存在这些序列化、反序列化操作,重用放入 EmbeddedRocksDBStateBackend 的对象是安全的。

设置

你能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。 

也可以使用如下代码为为每个任务设置State Backend;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

如果你想在 IDE 中使用 EmbeddedRocksDBStateBackend,或者需要在作业中通过编程方式动态配置它,必须添加以下依赖到 Flink 项目中。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>1.17.1</version>
    <scope>provided</scope>
</dependency>

flink-conf.yaml 相关配置

可选值包括:

  1.  jobmanager (HashMapStateBackend), 
  2. rocksdb (EmbeddedRocksDBStateBackend),
  3. 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: EmbeddedRocksDBStateBackend 对应为 org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory

state.checkpoints.dir 选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。 你能在 这里 找到关于 CheckPoint 目录结构的详细信息

# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

增量快照

RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。

一个增量快照是基于(通常多个)前序快照构建的(相当于是现有1个全量快照,后面跟了几个增量快照来进行数据的回复cuiyaonan2000@163.com)

增量快照会造成重复的数据,因此checkpoin的文件会变大,会更多的占有网络,但是在计算恢复的时候会变快.

设置

虽然状态数据量很大时我们推荐使用增量快照,但这并不是默认的快照机制,您需要通过下述配置手动开启该功能:

  • 在 flink-conf.yaml 中设置:state.backend.incremental: true 或者
  • 在代码中按照右侧方式配置(来覆盖默认配置):EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

需要注意的是,一旦启用了增量快照,网页上展示的 Checkpointed Data Size 只代表增量上传的数据量,而不是一次快照的完整数据量。文章来源地址https://www.toymoban.com/news/detail-522402.html

到了这里,关于Flink: checkPoint的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Databases and Big Data Technologies: Essential Knowledg

    作者:禅与计算机程序设计艺术 互联网正在改变着传统行业和新兴行业的结构,电子商务、社交网络、移动应用程序等新兴产业的迅速发展也催生了基于数据中心的数据库应用的需求,而这方面的知识技能是越来越重要。然而,除了数据库技术的基础知识和技术栈外,基于数

    2024年02月07日
    浏览(44)
  • IDEA 中使用 Big Data Tools 连接大数据组件

    简介 Big Data Tools 插件可用于 Intellij Idea 2019.2 及以后的版本。它提供了使用 Zeppelin,AWS S3,Spark,Google Cloud Storage,Minio,Linode,数字开放空间,Microsoft Azure 和 Hadoop 分布式文件系统(HDFS)来监视和处理数据的特定功能。 下面来看一下 Big Data Tools 的安装和使用,主要会配置

    2023年04月08日
    浏览(60)
  • Big Data Tools插件(详细讲解安装,连接,包教包会!!!)

    🐮博主syst1m 带你 acquire knowledge! ✨博客首页——syst1m的博客💘 😘《CTF专栏》超级详细的解析,宝宝级教学让你从蹒跚学步到健步如飞🙈 😎《大数据专栏》大数据从0到秃头👽,从分析到决策,无所不能❕ 🔥 《python面向对象(人狗大战)》突破百万的阅读量,上过各种各样

    2024年02月03日
    浏览(38)
  • Python Packages for Big Data Analysis and Visualization

    作者:禅与计算机程序设计艺术 Python第三方库主要分为两类:数据处理、可视化。下面是用于大数据分析与可视化的常用的Python第三方库列表(按推荐顺序排序): NumPy: NumPy 是用 Python 编写的一个科学计算库,其功能强大且全面,尤其适用于对大型多维数组和矩阵进行快速

    2024年02月07日
    浏览(48)
  • Building a big data platform system, architecture desig

    作者:禅与计算机程序设计艺术 Apache Hadoop是一个开源的分布式计算平台,它可以运行在廉价的商用硬件上,并提供可扩展性和高容错性。作为Hadoop框架的一部分,MapReduce是一种编程模型和执行引擎,用于对大数据集进行并行处理。但是,由于其复杂性和庞大的体系结构,开

    2024年02月05日
    浏览(58)
  • Apache Hadoop: Building a Big Data Distributed Environm

    作者:禅与计算机程序设计艺术 Apache Hadoop (以下简称HDFS)是一个开源的分布式文件系统,用来存储大量的数据集并进行计算处理。它可以处理超大数据集、实时数据分析、日志聚类等应用场景。HDFS被广泛应用于企业数据仓库、电子商务网站、搜索引擎、Hadoop生态系统中的大多

    2024年02月06日
    浏览(47)
  • Establishing a RealTime Big Data Platform for Transport

    作者:禅与计算机程序设计艺术 Apache Kafka是一个开源的分布式流处理平台,它最初由LinkedIn公司开发,用于实时数据管道及流动计算,随着时间的推移,Kafka已成为最流行的开源消息代理之一。同时,它还是一个快速、可靠的分布式存储系统,它可以作为消息队列来用。Mong

    2024年02月07日
    浏览(50)
  • How AI is changing Big Data and Business

    作者:禅与计算机程序设计艺术 随着人工智能的不断进步、计算机算力的不断提高,以及基于云计算平台的大数据产生的越来越多的数据,人工智能已成为经济界和产业界的一股重要力量。而人工智能究竟能给企业带来哪些新的机遇和变化,如何运用人工智能为企业提供更好

    2024年02月08日
    浏览(35)
  • An Introduction to Hadoop Streaming API in Big Data

    作者:禅与计算机程序设计艺术 Hadoop Streaming 是 Hadoop 的一个子项目,它可以让用户在 Hadoop 上运行离线批处理作业或实时流处理作业。其主要工作原理是从标准输入(stdin)读取数据,对其进行处理,然后输出到标准输出(stdout)。Hadoop Streaming 的计算模型是 MapReduce-like,每

    2024年02月08日
    浏览(46)
  • 大数据:HDFS操作的客户端big data tools和NFS

    2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得去找开发,测开 测开的话,你就得学数据库,sql,oracle,尤其sql要学,当然,像很多金融企业、安全机构啥的,他们必须要用oracle数据库 这oracle比sql安全,强大多了,所以你需要学

    2024年02月09日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包