Flink本地checkpoint测试

这篇具有很好参考价值的文章主要介绍了Flink本地checkpoint测试。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概述

在本地IDEA测试中,使用本地文件存储系统,作为checkpoint的存储系统,将备份数据存储到本地文件中,作业停止后,从本地备份数据启动Flink程序。

主要分为两步:

1)备份数据

2)从备份数据启动

二、备份数据

备份数据的配置,和使用HDFS文件体统类似,只不过路径填写成本地文件系统的路径,注意格式需要是 file:///******/******/,和HDFS文件系统的配置略有不同。文件具体存储的位置,在idea安装路径的根路径下。比如本人IDEA安装在D盘下,checkpoint地址配置为 file:///Users/flink/checkpoints/TestCheckPoint,那么生成的备份点数据在 D:\Users\flink\checkpoints\TestCheckPoint 目录下。

部分代码如下:

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 禁用全局任务链
        env.disableOperatorChaining();

        String brokers = "********.com:9092,********.com:9092,********.com:9092";
        String sourceTopic = "0000-checkpoint-test-source";
        String resultTopic = "0000-checkpoint-test-result";
        String groupId = "demo";
        
        String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";

        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);

        CheckpointConfig conf = env.getCheckpointConfig();
        // 任务流取消和故障应保留检查点
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        conf.setCheckpointInterval(10000);//milliseconds
        conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
        conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
        conf.setCheckpointStorage(checkPointPath);

生成的备份数据如下

flink 本地checkpoint,flink,flink,大数据

 

 

三、从备份数据启动

主要区别,在从备份点数据恢复运行程序。

如果是在yarn集群运行,在启动指令中加入 -s hdfs://ns1/flink/***/chk-*** \ 即可;而在本地运行,需要将备份点路径设置到运行环境中,可以通过启动指令设置,也可以在代码中直接设置。为了展示方便,这里直接在代码中设置。

比如,从备份点  D:\Users\flink\checkpoints\TestCheckPoint\3fc3902734d9316b3d8947508e95eabd\chk-9 恢复运行,需要将备份点设置到环境变量中,部分代码如下:

        Configuration configuration = new Configuration();
        configuration.setString("execution.savepoint.path", "file:///Users/flink/checkpoints/TestCheckPoint/3fc3902734d9316b3d8947508e95eabd/chk-9");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 禁用全局任务链
        env.disableOperatorChaining();

        String brokers = "kafka-log1.test.xl.com:9092,kafka-log2.test.xl.com:9092,kafka-log3.test.xl.com:9092";
        String sourceTopic = "0000-checkpoint-test-source";
        String resultTopic = "0000-checkpoint-test-result";
        String groupId = "demo";

        String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";

        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);

        CheckpointConfig conf = env.getCheckpointConfig();
        // 任务流取消和故障应保留检查点
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        conf.setCheckpointInterval(10000);//milliseconds
        conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
        conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
        conf.setCheckpointStorage(checkPointPath);

启动程序后,将会从备份点读取状态数据,继续进行计算。文章来源地址https://www.toymoban.com/news/detail-612767.html

到了这里,关于Flink本地checkpoint测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(86)
  • 【大数据】Flink 架构(五):检查点 Checkpoint(看完即懂)

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月19日
    浏览(47)
  • 源码解析Flink源节点数据读取是如何与checkpoint串行执行

    源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。 本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,

    2024年02月14日
    浏览(54)
  • Flink非对齐checkpoint原理(Flink Unaligned Checkpoint)

    为什么提出Unaligned Checkpoint(UC)? 因为反压严重时会导致Checkpoint失败,可能导致如下问题 恢复时间长-服务效率低 非幂等和非事务会导致数据重复 持续反压导致任务加入死循环(可能导致数据丢失,例如超过kafka的过期时间无法重置offset) UC的原理 UC有两个阶段(UC主要是

    2024年02月14日
    浏览(50)
  • Flink 学习八 Flink 容错机制 & checkpoint & savepoint

    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/ 上一节讲述 状态后端 ;Flink是一个 带状态stateful 的数据处理系统,在处理数据的过程中,各个算子的记录的状态会随着算子处理的状态而改变 ; 状态后端 负责将状态保存在内存或外部持久化存储中 (内存

    2024年02月09日
    浏览(79)
  • 【Flink入门修炼】2-3 Flink Checkpoint 原理机制

    如果让你来做一个有状态流式应用的故障恢复,你会如何来做呢? 单机和多机会遇到什么不同的问题? Flink Checkpoint 是做什么用的?原理是什么? Checkpoint 是对当前运行状态的完整记录。程序重启后能从 Checkpoint 中恢复出输入数据读取到哪了,各个算子原来的状态是什么,并

    2024年04月25日
    浏览(42)
  • Flink: checkPoint

    依据1.17.1 最新版本的内容研究下期运作原理,总的来说其实就是设置一些参数,这些参数就会影响到如何存储checkpoint的问题.用起来没什么难的,参数配置的组合到是挺多cuiyaonan2000@163.com 参考资料: Checkpointing | Apache Flink State Backends | Apache Flink Flink 中的每个方法或算子都能够是 有

    2024年02月12日
    浏览(36)
  • Flink源码之Checkpoint执行流程

    Checkpoint完整流程如上图所示: JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint SourceTask向下游广播CheckpointBarrier SouceTask完成状态快照后向JobMaster发送快照结果 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果 JobMaster保存SubTask快照结果 JobMaster收到所

    2024年02月11日
    浏览(39)
  • 如何排查 Flink Checkpoint 失败问题?

    这是 Flink 相关工作中最常出现的问题,值得大家搞明白。 1. 先找到超时的subtask序号 图有点问题,因为都是成功没失败的,尴尬了。 借图: 2. 找到对应的机器和任务 方法很多,这里看自己习惯和公司提供的系统。 3. 根据日志排查问题 netstat -nap| grep 端口号 就找到对应的p

    2024年02月10日
    浏览(39)
  • 【Flink】Flink 记录一个 checkpoint 检查点 越来越大的问题

    Flink SQL checkpoint越来越大咋么办,从2个G,现在4个G了,增量同步的,窗口是1小时,watermark是6小时,按道理来说,数据量不应该越来越大啊? 在窗口内执行了count(distinct )这些操作。设置了状态的ttl。后端状态存储用的rocksdb。 状态如下 设置了增量的检查点 代码设置不一定有

    2024年02月10日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包