Flink之Task重启策略

这篇具有很好参考价值的文章主要介绍了Flink之Task重启策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Task重启策略

1 策略API
  • noRestart

    无参数,task失败后不重启,整个job同时失败,默认策略.

    代码示例文章来源地址https://www.toymoban.com/news/detail-777452.html

    RestartStrategies.noRestart();
    
  • fixedDelayRestart

    参数 注释
    restartAttempts 最大重启次数
    delayBetweenAttempts 重启时间间隔

    代码示例

    // 最多重启5次,每次任务失败后间隔1s重启
    RestartStrategies.fixedDelayRestart(5, 1000);
    
  • exponentialDelayRestart

    参数 注释
    initialBackoff 重启间隔惩罚时长初始值(重启延迟时间)
    maxBackoff 重启间隔最大惩罚时长
    backoffMultiplier 重启间隔时长的惩罚倍数
    resetBackoffThreshold 重置惩罚时长的平稳运行时长(平稳运行时长达到这个阈值后,再次发生故障则重启延迟时间恢复到初始值)
    jitterFactor 取一个随机数,来加在重启时间点上,已让每次重启的时间呈现一定随机性(避免某一时刻集群中有大量的task同时重启,如果task重启时间是规律性的就可能发生这种情况)

    代码示例

    // 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
    RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
    
  • failureRateRestart

    参数 注释
    failureRate 指定时间范围内的最大Task任务失败率(次数)
    failureInterval 指定时间范围
    delayInterval 重启时间间隔

    代码示例

    // task失败重启间隔为1s,只要在30分钟内task失败重启次数没超过3次就可以一直执行这个策略,如果超过则job停止
    RestartStrategies.failureRateRestart(3, Time.minutes(30), Time.seconds(1));
    
  • fallBackRestart

    无参数,常用于自定义的RestartStrategy,即用户自定义了重启策略,且将其配置在了flink-conf.yaml文件中,也就是说调用这个方法时会读取集群的配置文件,根据配置文件的内容调整策略

    代码示例

    RestartStrategies.fallBackRestart();
    
2 代码详情
public class FlinkCheckpoint {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启并设置checkpoint的时间间隔
        env.enableCheckpointing(3000);
        // 设置checkpoint的存储位置
        env.getCheckpointConfig().setCheckpointStorage(new Path("hdfs://lx01:8020/flink-ck"));
        // 允许checkpoint失败的最大次数
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        // checkpoint的算法模式,是否需要对其(EXACTLY_ONCE)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // job取消是否保留checkpoint数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置checkpoint对齐的超时时间
        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofMillis(10000));
        // 两次checkpoint的最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
        // 并行最大的checkpoint数
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
        // 选择后端状态(默认HashMapStateBackend)
        env.setStateBackend(new EmbeddedRocksDBStateBackend());

        // TODO Task重启策略
        RestartStrategies.RestartStrategyConfiguration restartStrategy = null;
        // 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
        restartStrategy = RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);

        // 配置Task重启策略
        env.setRestartStrategy(restartStrategy);
      
        // ...业务代码
      
        env.execute();
    }
}

到了这里,关于Flink之Task重启策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Flink(八)Flink Task 部署初始化和启动详解

    核心入口: 部署 Task 链条:JobMaster -- DefaultScheduler -- SchedulingStrategy -- ExecutionVertex -- Execution -- RPC请求 -- TaskExecutor JobMaster 向 TaskExecutor 发送 submitTask() 的 RPC 请求,用来部署 StreamTask 运行。TaskExecutor 接收到 JobMaster 的部署 Task 运行的 RPC 请求的时候,就封装了一个 Task 抽象,然

    2024年01月17日
    浏览(68)
  • Flink源码-Task执行

    上一节我们分析到了Execution的生成,然后调用taskManagerGateway.submitTask方法提交task,提交的时候会将executionVertex封装成TaskDeploymentDescriptor,task的提交与执行涉及到了flink多个组件的配合,之前没有详细讲过,可能有的小伙伴有点不太清楚,这里我们花点时间介绍一下。 1.JobManager

    2024年02月03日
    浏览(29)
  • Flink之Task解析

      对Flink的Task进行解析前,我们首先要清楚几个角色 TaskManager 、 Slot 、 Task 、 Subtask 、 TaskChain 分别是什么 角色 注释 TaskManager 在Flink中TaskManager就是一个管理task的进程,每个节点只有一个TaskManager Slot Slot就是TaskManager中的槽位,一个TaskManager中可以存在多个槽位,取决于服务器资

    2024年02月12日
    浏览(33)
  • Flink Task退出流程与Failover机制

    Task.doRun() 引导Task初始化并执行其相关代码的核心方法, 构造并实例化Task的可执行对象: AbstractInvokable invokable。 调用 AbstractInvokable.invoke() 开始启动Task包含的计算逻辑。 当AbstractInvokable.invoke()执行退出后,根据退出类型执行相应操作: 正常执行完毕退出:输出ResultPartition缓冲

    2024年02月22日
    浏览(35)
  • 【Flink】Flink 的八种分区策略(源码解读)

    Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。 GlobalPartitioner ShufflePartitioner RebalancePartitioner RescalePartitioner BroadcastPartitioner ForwardPartitioner KeyGroupStreamPartitioner CustomPartitionerWrapper 该分区器会将所有的数据都

    2024年04月10日
    浏览(29)
  • 如何处理 Flink 作业频繁重启问题?

    Flink 实现了多种重启策略 固定延迟重启策略(Fixed Delay Restart Strategy) 故障率重启策略(Failure Rate Restart Strategy) 没有重启策略(No Restart Strategy) Fallback重启策略(Fallback Restart Strategy) Flink支持不同的重启策略,以在故障发生时控制作业如何重启 默认的重启策略:如果没有

    2024年02月10日
    浏览(27)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(34)
  • 【flink】状态清理策略(TTL)

    flink的keyed state是有有效期(TTL)的,使用和说明在官网描述的篇幅也比较多,对于三种清理策略没有进行横向对比得很清晰。 全量快照清理(FULL_STATE_SCAN_SNAPSHOT) 增量清理(INCREMENTAL_CLEANUP) rocksdb压缩清理(ROCKSDB_COMPACTION_FILTER) 注意, 三种状态清理策略不是互斥的,并不是三选一的

    2024年01月20日
    浏览(30)
  • Flink 启动就报错,但exception没提示。其中一个task failure 该怎么办?

    最近我在生产又遇到一个问题,就是消费着一段时间之后,忽然就不再消费了,但也不报错。观察了几次,我发现时间基本是停留在上下班高峰期数据量最大的时候。我主观猜测可能是同时间进来的数据过多,处理不来导致的。但这个问题我还没来的及思考怎么处理,因此我

    2024年02月16日
    浏览(44)
  • Flink 消费Kafka每日不定时积压(非重启不能解决)问题排查解决

    1. 背景         接手了一个问题排查的工作,有个Flink任务每天不定时会出现数据积压,无论是白天还是数据量很少的夜里,且积压的数据量会越来越多,得不到缓解,只能每日在积压告警后重启,重启之后消费能力一点毛病没有,积压迅速缓解,然而,问题会周而复始的

    2024年02月09日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包