【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理

这篇具有很好参考价值的文章主要介绍了【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一. state相关

1. state种类

按照数据的划分和扩张方式,Flink中大致分为2类:

  • Keyed States:记录每个Key对应的状态值

因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)
 
常用的 MapState、ValueState。

  • Operator States:记录每个Task对应的状态值数据类型。

 

2. State的存在形式

Keyed State 和 Operator State 存在两种形式:managed (托管状态)和 raw(原始状态)。

  • 托管状态是由Flink框架管理的状态,原始状态是由用户自行管理状态的具体数据结构。
  • 通常所有的 datastream functions 都可以使用托管状态,但是原始状态接口仅仅能够在实现 operators的时候使用。
  • 推荐使用 managed state 而不是使用 raw state,因为使用托管状态的时候 Flink 可以在 parallelism 发生改变的情况下能够动态重新分配状态,而且还能更好的进行内存管理。

 

3. state在哪产生

没有状态的操作

从概念上讲, 源表从来不会在状态中被完全保存。 形如 SELECT … FROM … WHERE
这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。

诸如 join、 聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。看下sum的状态操作

@Internal
public class StreamGroupedReduceOperator<IN>
        extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
        implements OneInputStreamOperator<IN, IN> {

    private static final long serialVersionUID = 1L;

    private static final String STATE_NAME = "_op_state";

    private transient ValueState<IN> values;

    private final TypeSerializer<IN> serializer;

    public StreamGroupedReduceOperator(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
        super(reducer);
        this.serializer = serializer;
    }

    @Override
    public void open() throws Exception {
        super.open();
        ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
        //获得value state
        values = getPartitionedState(stateId);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        IN currentValue = values.value();
        //如果currentValue不为null,则说明不是第一次启动,也就是在hdfs上已经存储了中间状态 
        if (currentValue != null) {
            //先做一个聚合,然后再更新,之后输出到下游
            IN reduced = userFunction.reduce(currentValue, value);
            values.update(reduced);
            output.collect(element.replace(reduced));
        } else {
            //第一次启动直接更新数据,之后输出到下游
            values.update(value);
            output.collect(element.replace(value));
        }
    }
}

 

4. state 内存设置

从 Flink1.10 开始,Flink 默认将 state 内存大小配置为每个 task slot 的托管内存。

调试内存性能的问题主要是通过调整配置项,来提高Flink的托管内存:

taskmanager.memory.managed.size 
//推荐使用比例计算
taskmanager.memory.managed.fraction 

具体调优案例分析可见:Flink on yarn双流join问题分析+性能调优思路

 
 

二. state backend

Flink状态后端主要负责两件事:本地的状态管理、将检查点(checkpoint)状态写入远程存储。

flink state可以存储在java堆内存内或者内存之外。

默认情况下,使用MemoryStateBackend,Flink的state会保存在taskManager的内存中,而checkpoint会保存在jobManager的内存中。

 

1. 三种状态后端

flink提供三种开箱即用的State Backend:

状态后端 数据存储 容量限制 场景
MemoryStateBackend
State:TaskManager 内存中
Checkpoint:存储在jobManager 内存
单个State maxStateSize默认为5M
maxStateSize <= akka.frame.size默认10M
Checkpoint总大小不能超过JobMananger的内存
本地测试
状态比较少的作业
不推荐生产环境中使用
FsStateBackend
State:TaskManager 内存
Checkpoint:外部文件系统(本地或HDFS)
单个TaskManager上State总量不能超过TM内存
总数据大小不超过文件系统容量
窗口时间比较长,如分钟级别窗口聚合,Join等
需要开启HA的作业
可在生产环境中使用
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的 RocksDB 数据库中.(一种 NoSql 数 据库, KV 形式存储)
State: TaskManager 中的KV数据库(实际使用内存+磁盘)
Checkpoint:外部文件系统(本地或HDFS)
单TaskManager 上 State总量不超过其内存+磁盘大小,单 Key最大容量2G
总大小不超过配置的文件系统容量
超大状态作业
需要开启HA的
作业生产环境可用

 

2. 如何在hdfs中存储?

Keyed States 和 Operator States 会存储在一个带有编号的 chk 目录中,比如说一个 flink 任务的 Keyed States 的 subTask 个数是4,Operator States 对应的 subTask 也是 4,那么 chk 会存一个元数据文件 _metadata ,四个 Keyed States 文件,四个 Operator States 的文件

也就是说 Keyed States 和 Operator States 会分别存储 subTask 总数个状态文件。

 

3. 设置checkpoint

一般需求,我们的 Checkpoint 时间间隔可以设置为分钟级别(1-5 分钟)。

3.1. 大状态下设置checkpoint

对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之 间至少暂停 4 或 8 分钟。

具体案例分析可见:Flink on yarn双流join问题分析+性能调优思路

 

3.2. EXACTLY_ONCE下设置分析checkpoint

如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。

 
 

三. State设置过期时间

使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。

对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。

从 Flink 1.6 版本开始引入了 State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理。

 

1. datastream的TTL

要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。State TTL功能所指定的过期时间并不是全局生效的,而是和某个具体的算子状态所绑定。

以下描述了state的构建、配置:过期时间、状态时间戳的更新,对过期数据的处理等内容。

 
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1)) //过期时间:上次访问的时间 +TTL 超过了当前时间,则表明状态过期了。
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //状态时间戳更新的时间
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //已过期但是还未处理的状态怎么处理,NeverReturnExpired:一旦状态过期,则永远不会被返回给调用方
    //清理策略:
    .cleanupFullSnapshot() //对过期状态不主动处理。默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。
    .cleanupIncrementally(1024,true)//增量清理,可配置读取若干条记录就执行一次清理,并可指定每次清理多少条失效记录。
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL配置不是check/savepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式。

 
小结:

state TTL 机制,应对通用的状态暴增特别有效。然而,机制不能保证一定可以及时清理掉失效的状态,以及目前仅支持 Processing Time 时间模式等等。

 
 

2.Table API和SQL的状态管理

针对 Table API 和 SQL 模块的持续查询/聚合语句,Flink 还提供了另一项失效状态清理机制,这就是 Idle State Retention Time。

2.1. 问题描述与分析

如下,官网的例子一个持续查询的分组语句,没有时间窗口的定义,理论上会无限地计算下去,但这里会出现一个问题:随着时间的推移,内存的状态会积累很多,直到状态达到了存储系统的极限,作业崩溃。

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

针对上面的问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念,如下描述:

通过为每个状态设置Timer,如果这个状态中途被访问过,则重新设置Timer;否则(如果状态一直没有被访问)Timer到期时做状态清理。

这样就可以确保每个状态能够被及时的清理。

 

2.2. 状态设置

streamTableEnvironment.getConfig().setIdleStateRetentionTime(
					Time.minutes(idleStateRetentionTime),
                    Time.of(idleStateRetentionTime * 60 + 5, TimeUnit.MINUTES));

注意:

旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。
 
新版本(1.11)的 Flink 要求两个参数之间的差距至少要达到 5 分钟,从而避免大量状态瞬间到期,对系统造成的冲击

 

2.3. 实现逻辑与源码分析

使用CleanupState 来表示idle state retention time

//状态空闲时间timer的注册
public interface CleanupState {
    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState, //通过ValueState来维护状态清理时间
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService)
            throws Exception {
        //最近一次要清理状态的时间
        Long curCleanupTime = cleanupTimeState.value();
 
        //如果curCleanupTime为空 或 维护的时间+最小的状态空闲时间大于curCleanupTime 
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
           //重新注册一个timer,
           //此时要注意:如果maxRetentionTime和minRetentionTime的间隔过小,就会频繁的产生timer与更新valuestate,维护timer的成本将会变大。
            long cleanupTime = currentTime + maxRetentionTime;
            timerService.registerProcessingTimeTimer(cleanupTime);
            //如果之前有timer则删除
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            //并更新清理时间,用于触发下一次清理
            cleanupTimeState.update(cleanupTime);
        }
    }
}

当数据第一次出现,或者curTime+minRetentionTime超过了最近的清理时间,就用curTime+maxRetentionTime,创建新的Timer,用于触发下一次清理,如果有了过期的timer就删除。
所以如果maxRetentionTime和minRetentionTime的间隔过小,就会频繁的产生timer与更新valuestate,维护timer的成本将会变大。

 
 
 
参考:
Flink 状态管理详解(State TTL、Operator state、Keyed state)文章来源地址https://www.toymoban.com/news/detail-429376.html

到了这里,关于【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 企业数字化转型:为什么需要做 ModelOps 模型全生命周期管理

    现如今,以大数据、云计算、人工智能、工业互联网为代表的数字科技正飞速发展,带领技术与产业向数字化、智能化的方向展开变革——数字科技正逐渐成为推动世界经济高质量发展的核心驱动力,数字经济应运而生。而对于企业来说, 数字化转型则是发展数字经济的必由

    2024年02月04日
    浏览(66)
  • Linux drm内存管理(一) 浅谈TTM与GEM,为什么我们需要TTM和GEM?

    @[TOC](Linux drm内存管理(一) 为什么我们需要TTM和GEM?) 系列文章(更新中): Linux drm内存管理(二) TTM内存管理基础概念   目前Kernel中DRM中GPU的VRAM(GPU片上显存)的管理框架是有GEM和TTM,其中TTM早于GEM出现,GEM的出现是为了解决TTM复杂的使用方法,将大部分的VRAM管理实现逻辑交由

    2023年04月20日
    浏览(50)
  • Flink State 状态管理

    状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容: 状态数据的存储和访问 在Task内部,如何高效地保存状态数据和使用状态数据。 状态数据的备份和恢复 作业失败是无法避免的,那么就要考虑如何高效地将状态

    2024年01月17日
    浏览(47)
  • Flink状态详解:什么是时状态(state)?状态描述(StateDescriptor)及其重要性

    了解Flink中的状态概念及其在流处理中的重要性。探讨状态在有状态计算中的作用,状态描述符(StateDescriptor)的基本概念和用法。理解状态在Flink任务中的维护、恢复和与算子的关联。

    2024年02月08日
    浏览(46)
  • 为什么需要数据仓库

    为什么不在OLTP环境下分析?  OLTP环境也会存储历史数据,但这些历史数据并不是业务运行所需的,这些历史数据需要经常归档到数据仓库,并且在OLTP数据库中删除。 相比之下,事务环境适用于连续处理事务,通常应用于订单录入以及财务和零售事务。它们并不依赖历史数据

    2024年01月25日
    浏览(66)
  • 为什么需要超时控制

    本文将介绍为什么需要超时控制,然后详细介绍Go语言中实现超时控制的方法。其中,我们将讨论 time 包和 context 包实现超时控制的具体方式,并说明两者的适用场景,以便在程序中以更合适的方式来实现超时控制,提高程序的稳定性和可靠性。 超时控制可以帮助我们避免程

    2024年02月03日
    浏览(56)
  • 为什么需要websocket?

    前端和后端的交互模式最常见的就是前端发数据请求,从后端拿到数据后展示到页面中。如果前端不做操作,后端不能主动向前端推送数据,这也是http协议的缺陷。        因此,一种新的通信协议应运而生---websocket,他最大的特点就是服务端可以主动向客户端推送消息,客

    2024年02月12日
    浏览(59)
  • 为什么需要单元测试?

    为什么需要单元测试? 从产品角度而言,常规的功能测试、系统测试都是站在产品局部或全局功能进行测试,能够很好地与用户的需要相结合,但是缺乏了对产品研发细节(特别是代码细节的理解)。 从测试人员角度而言,功能测试和系统测试以及其他性能测试等等对测试

    2024年02月12日
    浏览(69)
  • 为什么需要uboot?

    bootROM: 一种固化在芯片内部的只读存储器(ROM),用于启动和初始化系统。BootROM 中通常包含了一些预先编写好的代码,用于完成系统启动前的基本初始化和配置, 例如初始化时钟、GPIO控制器、中断控制器、存储设备(SD卡、NAND Flash、SPicy Flash)等硬件资源, 检测启动设备

    2023年04月23日
    浏览(63)
  • 为什么需要对相机标定?

    以下内容来自系统教程如何搞定单目/鱼眼/双目/阵列 相机标定? 点击领取相机标定资料和代码 为什么需要对相机标定? 我们所处的世界是三维的,而相机拍摄的照片却是二维的,丢失了其中距离/深度的信息。从数学上可以简单理解为,相机本身类似一个映射函数,其将输

    2024年02月06日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包