序言
依据1.17.1 最新版本的内容研究下期运作原理,总的来说其实就是设置一些参数,这些参数就会影响到如何存储checkpoint的问题.用起来没什么难的,参数配置的组合到是挺多cuiyaonan2000@163.com
参考资料:
- Checkpointing | Apache Flink
- State Backends | Apache Flink
Checkpointing #
Flink 中的每个方法或算子都能够是有状态的(阅读 working with state 了解更多)。 状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。 为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。
容错文档 中介绍了 Flink 流计算容错机制内部的技术原理。
Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:
- 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
- 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
开启与配置 Checkpoint #
默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment
的 enableCheckpointing(n)
来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。----这个就是checkpoint的间隔时间,没有最小时间好
另外一种是通过TableEnvironment 中的conf来开启checkpoint
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TableEnvironment tableEnv = TableEnvironment.create(settings);
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "50");
configuration.setString("table.dml-sync", "false");
configuration.setString("execution.checkpointing.interval", "3s");
这种方式可以设置更多的属性
Checkpoint 其他的属性包括:
-
精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向
enableCheckpointing(long interval, CheckpointingMode mode)
方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 -
checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。---checkpoint超时就放弃上一个checkpoint,去执行下一个,这样子有可能永远都不会执行成功,因为每一个都超时cuiyaonan2000@163.com
-
checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。----最小间隔貌似不会让 checkpoint间隔失效,而是根据情况进行二选一,最小间隔不会关注checkpoin的执行时间,及时超时了,也不会被抛弃cuiyaonan2000@163.com
注意这个值也意味着并发 checkpoint 的数目是一。
-
checkpoint 可容忍连续失败次数:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 可容忍的checkpoint失败仅适用于下列情形:Job Manager的IOException,TaskManager做checkpoint时异步部分的失败, checkpoint超时等。TaskManager做checkpoint时同步部分的失败会直接触发作业fail over。其它的checkpoint失败(如一个checkpoint被另一个checkpoint包含)会被忽略掉。
-
并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。
该选项不能和 “checkpoints 间的最小时间"同时使用。
-
externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。
设置如上的配置代码示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
State Backend #
状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend。
Flink 的 checkpointing 机制 会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及任何用户自定义的状态。 Checkpoint 存储在哪里取决于所配置的 State Backend(比如 JobManager memory、 file system、 database)。
默认情况下,状态是保持在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。通过 StreamExecutionEnvironment.setStateBackend(…)
来配置所选的 state backends。
内置的 State Backends #
Flink 内置了以下这些开箱即用的 state backends :
- HashMapStateBackend
- EmbeddedRocksDBStateBackend
如果不设置,默认使用 HashMapStateBackend
HashMapStateBackend #
HashMapStateBackend
是非常快的,因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上
在 HashMapStateBackend 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。
HashMapStateBackend 的适用场景:
- 有较大 state,较长 window 和较大 key/value 状态的 Job。
- 所有的高可用场景。
建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。
与 EmbeddedRocksDBStateBackend 不同的是,由于 HashMapStateBackend 将数据以对象形式存储在堆中,因此重用这些对象数据是不安全的。
EmbeddedRocksDBStateBackend #
RocksDB
可以根据可用的 disk 空间扩展,并且只有它支持增量 snapshot。
EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 不同于 HashMapStateBackend
中的 java 对象,数据被以序列化字节数组的方式存储,这种方式由序列化器决定,因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的 hashCode
或 equals()
方法。
EmbeddedRocksDBStateBackend 的适用场景:
- 状态非常大、窗口非常长、key/value 状态非常大的 Job。
- 所有高可用的场景。
EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见 这里)。
EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。 同时因为存在这些序列化、反序列化操作,重用放入 EmbeddedRocksDBStateBackend 的对象是安全的。
设置
你能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。
也可以使用如下代码为为每个任务设置State Backend;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
如果你想在 IDE 中使用 EmbeddedRocksDBStateBackend
,或者需要在作业中通过编程方式动态配置它,必须添加以下依赖到 Flink 项目中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
flink-conf.yaml
相关配置
可选值包括:
- jobmanager (HashMapStateBackend),
- rocksdb (EmbeddedRocksDBStateBackend),
- 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: EmbeddedRocksDBStateBackend 对应为
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
。
state.checkpoints.dir
选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。 你能在 这里 找到关于 CheckPoint 目录结构的详细信息
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
增量快照
RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
一个增量快照是基于(通常多个)前序快照构建的(相当于是现有1个全量快照,后面跟了几个增量快照来进行数据的回复cuiyaonan2000@163.com)
增量快照会造成重复的数据,因此checkpoin的文件会变大,会更多的占有网络,但是在计算恢复的时候会变快.
设置
虽然状态数据量很大时我们推荐使用增量快照,但这并不是默认的快照机制,您需要通过下述配置手动开启该功能:文章来源:https://www.toymoban.com/news/detail-522402.html
- 在
flink-conf.yaml
中设置:state.backend.incremental: true
或者 - 在代码中按照右侧方式配置(来覆盖默认配置):
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
需要注意的是,一旦启用了增量快照,网页上展示的 Checkpointed Data Size
只代表增量上传的数据量,而不是一次快照的完整数据量。文章来源地址https://www.toymoban.com/news/detail-522402.html
到了这里,关于Flink: checkPoint的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!