Flink 源码剖析|键控状态的 API 层

这篇具有很好参考价值的文章主要介绍了Flink 源码剖析|键控状态的 API 层。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 键控状态

在 Flink 中有如下 5 种键控状态(Keyed State),这些状态仅能在键控数据流(Keyed Stream)的算子(operator)上使用。键控流使用键(key)对数据流中的记录进行分区,同时也会对状态进行分区。要创建键控流,只需要在 DataStream 上使用 keyBy() 方法指定键即可。

具体地,这 5 种键控状态如下:

  • ValueState<T>:保存一个值;这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • MapState<UK, UV>:保存一个映射列表;可以使用 put(UK, UV)putAll(Map<UK, UV>) 添加映射,使用 get(UK) 来检索特定的 key,使用 entires()keys()values() 分别检索映射、键和值的可迭代视图,使用 isEmpty() 判断是否包含任何键值对。
  • ListState<T>:保存一个元素的列表;可以通过 add(T) 或者 addAll(List<T>) 添加元素,通过 Iterable<T> get() 获取整个列表,通过 update(List<T>) 覆盖当前的列表。
  • ReducingState<T>:保存一个值,表示添加到状态的所有值聚合后的结果;使用 add(T) 添加元素,并使用提供的 reduceFunction 进行聚合。
  • AggregatingState<IN, OUT>:保存一个值,表示添加到状态的所有值聚合后的结果;使用 add(T) 添加元素,并使用提供的 AggregateFunction 进行聚合。与 ReducingState 不同的时,聚合类型可能与添加到状态的元素类型不同。

所有的类型状态还有一个 clear() 方法,用于清除当前键的状态数据。

键控状态具有如下特性:

  • 实现以上 5 个接口的状态对象仅用于与状态交互,状态本身不一定存储在内存中,还可能在磁盘或其他位置。
  • 从状态中获取的值取决于输入元素所代表的 Key,在不同 key 上调用同一个接口,可能得到不同的值。

2 键控状态的源码

Flink 源码剖析|键控状态的 API 层,Flink,flink,state,状态

2.1 State

State 接口是所有状态接口的基类,其中只定义了一个 clear() 方法,用于移除当前 key 下的状态。

源码|Github|org.apache.flink.api.common.state.State

@PublicEvolving
public interface State {

    /** Removes the value mapped under the current key. */
    void clear();
}

2.2 ValueState:每个 key 存储一个值

ValueState<T> 接口是用于保存一个值的状态,其中定义了 value() 方法和 update(T value) 方法用于查询和更新当前 key 的状态;泛型 V 是存储的值的类型。

源码|Github|org.apache.flink.api.common.state.ValueState

@PublicEvolving
public interface ValueState<T> extends State {

    T value() throws IOException;

    void update(T value) throws IOException;
}

2.3 MapState:每个 key 存储一个映射

MapState<UK, UV> 接口用于保存一个映射,泛型 UK 是映射的键的类型,泛型 UV 是映射的值的类型。其方法与 Java 的 Map 接口类似,具体包含如下方法:

  • get(UK key):获取状态中 key 对应的值
  • put(UK key, UV value):将键值对 key / value 写入到状态中
  • putAll(Map<UK, UV> map):将 map 中的所有键值对写入到状态中
  • remove(UK key):移除状态中 key 及其对应的值
  • contains(UK key):查询状态中是否包含 key
  • entries() / iterator():遍历状态中的所有键值对
  • keys():遍历状态中的所有键
  • values():遍历状态中的所有值
  • isEmpty():查询状态的映射是否为空

源码|Github|org.apache.flink.api.common.state.MapState

@PublicEvolving
public interface MapState<UK, UV> extends State {

    UV get(UK key) throws Exception;

    void put(UK key, UV value) throws Exception;

    void putAll(Map<UK, UV> map) throws Exception;

    void remove(UK key) throws Exception;

    boolean contains(UK key) throws Exception;

    Iterable<Map.Entry<UK, UV>> entries() throws Exception;

    Iterable<UK> keys() throws Exception;

    Iterable<UV> values() throws Exception;

    Iterator<Map.Entry<UK, UV>> iterator() throws Exception;

    boolean isEmpty() throws Exception;
}

2.4 AppendingState 及其子类:每个 key 存储一个累加状态

AppendingState<IN, OUT> 接口定义了一个具有类似累加器性质的状态,其泛型 IN 为每次添加元素的类型,OUT 为结果的类型。其中包含 2 个方法,add(IN value) 方法用于向累加器添加元素,get() 方法用于获取当前累加器的值。

源码|Github|org.apache.flink.api.common.state.AppendingState

@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

    OUT get() throws Exception;

    void add(IN value) throws Exception;
}

MergingState<IN, OUT> 接口继承了 AppendingState<IN, OUT> 接口,要求在实现时支持类似累加器的合并运算,即将两个 MergingState 实例合并为包含 2 个实例信息的 1 个 MergingState 实例。

源码|Github|org.apache.flink.api.common.state.MergingState

@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {}

ListState<T>ReducingState<T>AggregatingState<IN, OUT> 均继承了 MergingState<IN, OUT>

2.4.1 ListState:输入元素,输出可迭代对象

ListState<T>add(T value) 接受一个 T 类型的元素,get() 方法返回一个 Iterable<T>。同时,额外定义了 2 个方法:

  • update(List<T> values):使用 values 替换到当前状态中的列表
  • addAll(List<T> values):将 values 添加到当前状态的列表中

源码|Github|org.apache.flink.api.common.state.ListState

@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

    void update(List<T> values) throws Exception;

    void addAll(List<T> values) throws Exception;
}
2.4.2 ReducingState:输入和输出类型一致

ReducingState<T> 接口并没有定义新的方法,但是调整了 MergingState 接口的泛型类型,add(T value) 接受一个 T 类型的元素,get() 方法返回一个 T 类型的对象。

源码|Github|org.apache.flink.api.common.state.ReducingState

@PublicEvolving
public interface ReducingState<T> extends MergingState<T, T> {}
2.4.3 AggregatingState:在每个输入元素后直接聚合

AggregatingState<IN, OUT> 接口并没有定义额外的逻辑,add(IN value) 接受一个 IN 类型的元素,get() 方法返回一个 OUT 类型的对象。但是规定在每个元素输入后,都需要直接将该元素聚合到当前的最终结果上。

源码|Github|org.apache.flink.api.common.state.AggregatingState

@PublicEvolving
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}

3 使用键控状态

在使用键控状态时,必须创建一个 StateDescriptor,并在 UDF 的 open() 方法中,并从 RuntimeContext 中获取该状态的状态句柄(state handle)。在状态句柄中,记录了状态名称、状态所持有值的类型以及用户所指定的函数。根据不同的状态类型,可以创建 ValueStateDescriptorListStateDescriptorAggregatingStateDescriptorReducingStateDescriptorMapStateDescriptor

样例|获取 Tuple2<Long, Long> 类型 ValueState 的状态句柄

private transient ValueState<Tuple2<Long, Long>> sum;  // 定义状态句柄

@Override
public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
            new ValueStateDescriptor<>(
                    "average", // 状态名称
                    TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 状态的类型信息
                    Tuple2.of(0L, 0L)); // 状态的默认值
    sum = getRuntimeContext().getState(descriptor);
}

在使用状态时,直接使用状态句柄中的方法即可。

样例|使用 Tuple2<Long, Long> 类型的 ValueState

// 访问 ValueState 类型状态的值
Tuple2<Long, Long> currentSum = sum.value();
// 更新 ValueState 类型状态的值
sum.update(currentSum);

因为状态句柄需要通过 RuntimeContext 获取,因此只能在富函数中使用。富函数的详细介绍见 “3 UDF 接口与富函数”。

下面来看一个 Flink 官方文档中的状态使用样例。

样例|使用 ValueState 计算每 2 个相邻元素的平均值发往下游的 UDF 函数的样例

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

 /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

定义 UDF 的 ValueState 类型的对象属性 sum,其值为一个元组,元组中第一个元素用于存储计数结果,第二个元素存储求和结果。

open() 方法中,创建 ValueStateDescriptor 并定义了状态名称、状态类型和状态的初始值,在获取到 ValueState 类型的状态句柄后,将其状态存储到实例属性 sum 中。

使用了 ValueState 接口的 value()update()clear() 方法获取、更新、清除当前的值。文章来源地址https://www.toymoban.com/news/detail-831094.html

到了这里,关于Flink 源码剖析|键控状态的 API 层的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理

    按照数据的划分和扩张方式,Flink中大致分为2类: Keyed States:记录每个Key对应的状态值 因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)   常

    2024年02月01日
    浏览(55)
  • Flink状态详解:什么是时状态(state)?状态描述(StateDescriptor)及其重要性

    了解Flink中的状态概念及其在流处理中的重要性。探讨状态在有状态计算中的作用,状态描述符(StateDescriptor)的基本概念和用法。理解状态在Flink任务中的维护、恢复和与算子的关联。

    2024年02月08日
    浏览(44)
  • Flink理论—容错之状态后端(State Backends)

    Flink 使用流重放 和 检查点 的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,同时保持一致性 容错机制不断地绘制分布式流数据流的快照。对于小状态的流式应用程

    2024年02月20日
    浏览(37)
  • flink state原理,TTL,状态后端,数据倾斜一文全

    拿五个字做比喻:“铁锅炖大鹅”,铁锅是状态后端,大鹅是状态,Checkpoint 是炖的动作。 状态 :本质来说就是数据,在 Flink 中,其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapState,ValueState,ListState。 状态后端 :Flink 提供的用于管理状态的组件,状态后端决

    2024年02月22日
    浏览(49)
  • Flink源码之State创建流程

    StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateH

    2024年02月12日
    浏览(88)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(37)
  • 深入理解 Flink(五)Flink Standalone 集群启动源码剖析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(47)
  • Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 状态与容错 - 使用状态 相关文档: 有状态流处理背后的概念:Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记 Redis 过期 key 的删除机制:Redis|过期 key 的删除机制 学习笔记如下: 如果要使用键控状态,则必须要为 DataS

    2024年02月03日
    浏览(42)
  • Flink 源码剖析|累加器

    累加器是实现了 加法运算 功能和 合并运算 (合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。 Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合

    2024年02月21日
    浏览(49)
  • Flink 源码剖析|RuntimeContext 接口

    每个并行的实例都会包含一个 RuntimeContext 。 RuntimeContext 接口包含函数执行的上下文信息,提供了如下功能: 访问静态上下文信息(例如当前并行度) 添加及访问累加器 访问外部资源信息 访问广播变量和分布式缓存 访问并编辑状态 下面,我们逐类介绍 RuntimeContext 接口的方

    2024年02月22日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包