Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记

这篇具有很好参考价值的文章主要介绍了Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

学习文档:Flink 官方文档 - DataStream API - 状态与容错 - 使用状态

相关文档:

  • 有状态流处理背后的概念:Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记
  • Redis 过期 key 的删除机制:Redis|过期 key 的删除机制

学习笔记如下:


键控流(Keyed DataStream)

如果要使用键控状态,则必须要为 DataStream 指定 key。这个主键将用于对数据流中的记录分区,同时也会用于状态分区。

可以使用 DataStream 中的 keyBy(KeySelector)(Java / Scala)或 key_by(KeySelector) 来指定 key,在指定 key 后,数据流将变成键控流(KeyedStream),并允许使用基于 Keyed state 的操作。

KeySelector 接受每条记录作为输入,并返回这条记录的 key。该 key 可以是任何类型,但它的计算产生方式必须是具有确定性的(详见 Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记)。例如:

// some ordinary POJO
public class WC {
  public String word;
  public int count;

  public String getWord() { return word; }
}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(WC::getWord);

Flink 的数据类型并不基于 key - value 对,因此实际上将数据集在物理上封装为 key 和 value 是没有必要的。

键控状态(Keyed State)

以下 Keyed State 只能在 KeyedStream 上使用:

  • ValueState<T>:保存一个可以更新和检索的值;这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • ListState<T>:保存一个元素的列表;可以通过 add(T) 或者 addAll(List<T>) 添加元素,通过 Iterable<T> get() 获取整个列表,通过 update(List<T>) 覆盖当前的列表。
  • ReducingState<T>:保存一个值,表示添加到状态的所有值聚合后的结果;使用 add(T) 添加元素,并使用提供的 reduceFunction 进行聚合。
  • AggregatingState<IN, OUT>:保存一个值,表示添加到状态的所有值聚合后的结果;使用 add(T) 添加元素,并使用提供的 AggregateFunction 进行聚合。与 ReducingState 不同的时,聚合类型可能与添加到状态的元素类型不同。
  • MapState<UK, UV>:保存一个映射列表;可以使用 put(UK, UV)putAll(Map<UK, UV>) 添加映射,使用 get(UK) 来检索特定的 key,使用 entires()keys()values() 分别检索映射、键和值的可迭代视图,使用 isEmpty() 判断是否包含任何键值对。

所有的类型状态还有一个 clear() 方法,用于清除当前 key 下的状态数据,也就是当前输入元素的 key。

需要注意的是:

  • 这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
  • 从状态中获取的值取决于输入元素所代表的 Key,在不同 key 上调用同一个接口,可能得到不同的值。

在使用中,必须创建一个 StateDescriptor,才能获得对应的状态句柄。在状态句柄中,记录了状态名称、状态所持有值的类型以及用户所指定的函数。根据不同的状态类型,可以创建 ValueStateDescriptorListStateDescriptorAggregatingStateDescriptorReducingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。

样例:计数窗口,这个 UDF 会计算每两个相邻的元素的平均值并发送到下游。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
  • 定义 UDF 的 ValueState 类型的私有属性 sum,其值为一个元组,元组中第一个元素用于存储计数结果,第二个元素存储求和结果。
  • open() 方法中,定义了状态句柄 ValueStateDescriptor,定义了状态名称、状态类型和状态的初始值,并将其状态存储到属性 sum 中。
  • 使用 sum.value() 获取当前状态的值
  • 使用 sum.updaste() 更新当前状态的值
  • 使用 sum.clear() 清空当前状态的值
状态有效期(TTL)

任何类型的 Keyed State 都可以设置有效期(TTL)。如果配置了 TTL 且状态已过期,则会尽最大可能清除对应的值。

任何状态类型都支持单元素的 TTL。这意味着列表元素和映射元素将单独计算到期时间。

在使用 TTL 前,需要先构建 StateTtlConfig 配置对象,然后把配置传递到 State Descriptor 中启用 TTL 功能。

TTL 配置的选项
  • 数据的有效期:newBuilder() 的第一个参数,必选

  • 更新策略:setUpdateType() 的第一个参数,可选,默认为 onCreateAndWrite

    • StateTtlConfig.UpdateType.onCreateAndWrite:仅在创建和写入时更新
    • StateTtlConfig.UpdateType.onReadAndWrite:在读取时也更新
  • 数据在未被清理时的可见性配置:setStateVisibility() 的第一个参数,可选,默认为 NeverReturnedExpired

    • StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
    • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
  • 关门后台清理:disableCleanupInBackground(),可选,添加则关闭过期数据的后台清理

  • 开启全量快照时清理:cleanupFullSnapshot(),可选,添加则开启在全来那个快照时进行清理

  • 开启 Heap Backend 增量数据清理:cleanupIncrementally(),可选,添加则在访问和处理时进行检查过期数据并清理

  • 开启 RcoksDB Backend 压缩时数据清理:cleanupInRocksdbCompactFilter(),可选,添加在开启压缩时数据清理

需要注意的是:因为在开启 TTL 特性后,状态上次的修改时间会和数据一起保存在 state backend 中,所以开启这个特性会增加状态数据的存储。

TTL 的清理策略

默认情况下,过期数据会在读取的时候被删除,同时也会有后台进程定期清理。

在实现上,HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

  • 全量快照时进行清理:在全量快照时进行清理的策略,可以减少整体快照的大小。当前实现中不会清理本地状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。这种清理策略可以在任何时候通过 StateTtlConfig 启动或者关闭。
  • 增量数据清理:如果开启增量式清理状态数据,在会状态访问和处理时进行清理。对于开启了增量数据清理策略的状态,会在存储后端保留一个所有状态的惰性全局迭代器,每次出发增量清理时,从迭代器中选择已经过期的数据进行清理。该策略有两个参数,第一个表示每次清理时检查状态的条目数,第二个参数表示是否在处理每条记录时都触发清理。Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
  • 压缩时清理:RcoksDB 会周期性地对数据进行合并压缩从而减少存储空间,Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的数据。该策略有一个参数,该参数表示每处理多少条数据进行一次清理。

Flink 的状态清理策略与 Redis 的被动清理 + 主动清理有很多相似之处,详见 Redis|过期 key 的删除机制。

算子状态(Operator State)

算子状态是绑定到一个并行算子实例的状态。例如,Kafka Connector 是 Flink 中就使用了算子状态,Kafka consumer 的每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。

算子状态通常用于实现 source / sink,以及一些没有 key 而无法对 state 进行分区的场景。

广播状态(Broadcast State)

广播状态时一种特殊的算子状态,用于将状态广播到所有下游任务。通过广播状态,可以保持所有子任务状态相同。

广播状态与其他算子状态的差异:

  • 它具有 map 格式
  • 仅在输入为一个广播数据流和一个非广播数据流的算子中可用
  • 可以拥有多个不同名称的广播状态

使用算子状态

通过实现 CheckpointedFunction 接口来使用算子状态。在 CheckpointedFunction 中提供了访问 non-keyed state 的方法,需要实现如下两个方法:

  • void snapshotState(FunctionSnapshotContext context) throws Exception:在进行 checkpoint 时调用
  • void initializeState(FunctionInitializationContext context) throws Exception:在 UDF 初始化时调用,这里的初始化包括第一次启动时的初始化,以及从 checkpoint 恢复的初始化。

当前算子状态会以 list 的形式存在,这些状态彼此独立,方便在改变并发后进行状态的重新分派。有如下几种重新分配的模式:

  • Even-split redistribution:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。
  • Union redistribution:每个算子保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,每个算子都将得到所有的状态数据。如果状态的数量很大时不要使用这个特性,可能导致内存溢出的问题。

样例:SinkFunctionCheckpointedFunction 中进行数据缓存,然后统一发送到下游。

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
  • initializeState():接受一个 FunctionInitializationContext 参数,并用来初始化 non-keyed state 的容器,这个容器是一个 ListState 类型的 checkpointedState吗,用于在 checkpoint 时保存 non-keyed state 对戏那个。与 keyed state 类似,在初始化时状态句柄 descriptor 时,也会包括状态名称、状态类型等信息。如果是从 checkpoint 中恢复(即 context.isRestored()),则将 checkpointedState 中的元素读取并添加到 bufferedElements 中。
  • snapshotState():在快照时,清空 checkpointedState 并将 bufferedElements 中缓存的元素全部添加到 checkpointedState 中。
  • invoke():将传入的数据添加到 bufferedElements 中进行缓存;当缓存数量达到阈值后统一写出并将缓存清空。

在调用 getOperatorStateStore() 后,调用不同的获取状态对象的接口,会使用不同的状态分配算法。例如调用 getUnionListState(descriptor) 会使用 union redistribution 算法,而调用 getListState(descriptor) 则会使用 even-split redistribution 算法。

使用带状态的 Source Function

样例:文章来源地址https://www.toymoban.com/news/detail-778736.html

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements CheckpointedFunction {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;
    
    /** 存储 state 的变量. */
    private ListState<Long> state;
     
    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            "state",
            LongSerializer.INSTANCE));
            
        // 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
        for (Long l : state.get()) {
            offset = l;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.clear();
        state.add(offset);
    }
}
  • run():为了保证更新状态以及输出的原子性(用于实现 exactly-once 语义),需要在发送数据前获取数据源的全局锁。
  • snapshotState():在快照时,我们存储当前偏移量即可。
  • initializeState():在启动或恢复时,我们需要恢复偏移量。

到了这里,关于Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(30)
  • 《Flink学习笔记》——第五章 DataStream API

    一个Flink程序,其实就是对DataStream的各种转换,代码基本可以由以下几部分构成: 获取执行环境 读取数据源 定义对DataStream的转换操作 输出 触发程序执行 获取执行环境和触发程序执行都属于对执行环境的操作,那么其构成可以用下图表示: 其核心部分就是Transform,对数据

    2024年02月10日
    浏览(33)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(30)
  • 【Flink】DataStream API使用之源算子(Source)

    创建环境之后,就可以构建数据的业务处理逻辑了,Flink可以从各种来源获取数据,然后构建DataStream进项转换。一般将数据的输入来源称为数据源(data source),而读取数据的算子就叫做源算子(source operator)。所以,Source就是整个程序的输入端。 Flink中添加source的方式,是

    2024年02月10日
    浏览(29)
  • Flink|《Flink 官方文档 - 部署 - 概览》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 概览》 学习笔记如下: 上图展示了 Flink 集群的各个构建(building blocks)。通常来说: 客户端获取 Flink 应用程序代码,将其转换为 JobGraph,并提交给 JobManager JobManager 将工作分配给 TaskManager,并在那里执行实际的算子操作 在部署 Flink 时,

    2024年01月19日
    浏览(41)
  • Flink|《Flink 官方文档 - 内幕 - 文件系统》学习笔记

    学习文档:内幕 - 文件系统 学习笔记如下: Flink 通过 org.apache.flink.core.fs.FileSystem 实现了文件系统的抽象。这种抽象提供了一组通用的操作,以支持使用各类文件系统。 为了支持众多的文件系统, FileSystem 的可用操作集非常有限。例如,不支持对现有文件进行追加或修改。

    2024年02月03日
    浏览(27)
  • Flink|《Flink 官方文档 - 概念透析 - Flink 架构》学习笔记

    学习文档:概念透析 - Flink 架构 学习笔记如下: 客户端(Client):准备数据流程序并发送给 JobManager(不是 Flink 执行程序的进程) JobManager:协调 Flink 应用程序的分布式执行 ResourceManager:负责 Flink 集群中的资源提供、回收、分配 Dispatcher:提供了用来提交 Flink 应用程序执行

    2024年01月19日
    浏览(39)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(37)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记

    学习文档:Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。其中,Flink 总内存(Total Flink Memory)包括 JV

    2024年03月15日
    浏览(35)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》 学习笔记如下: Flink 中每条消息都会被放到网络缓冲(network buffer) 中,并以此为最小单位发送到下一个 subtask。 Flink 在传输过程的输入端和输出端使用了网络缓冲队列,即每个 subtask 都有一个输入队列来接收

    2024年01月21日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包