Flink State 状态管理

这篇具有很好参考价值的文章主要介绍了Flink State 状态管理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容:

  • 状态数据的存储和访问
    在Task内部,如何高效地保存状态数据和使用状态数据。
  • 状态数据的备份和恢复
    作业失败是无法避免的,那么就要考虑如何高效地将状态数据保存下来,避免状态备份降低集群的吞吐量,并且在Failover时恢复作业到失败前的状态。
  • 状态数据的划分和动态扩容
    作业在集群内并行执行那么就要思考对于作业的Task而言如何使用统一的方式对状态数据进行切分,在作业修改并行度导致Task数据改变的时候,如何确保正确地恢复。

一、状态分类

State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同,flink定义了多种state,分别应用于不同的场景,具体实现如下:ValueState、ListState、MapState、ReducingState、AggregatingState。

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

二、keyed代码示例

更多代码示例请下载Flink State体系剖析以及案例实践

ListState

代码如下:文章来源地址https://www.toymoban.com/news/detail-796627.html


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 需求:当接收到的相同 key 的元素个数等于 3个,就计算这些元素的 value 的平均值。
 * 计算keyed stream中每3个元素的 value 的平均值
 */
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(12);
        //获取数据源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(2L, 2L),
                        Tuple2.of(2L, 6L));
        /**
         * 1L, 3L
         * 1L, 7L
         * 1L, 5L
         *
         * 1L,5.0 double
         *
         * 2L, 4L
         * 2L, 2L
         * 2L, 6L
         *
         * 2L,4.0 double
         *
         *
         */


        // 输出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource
                .keyBy(tuple -> tuple.f0) //分组
                .flatMap(new CountAverageWithListState())
                .print();


        env.execute("TestStatefulApi");
    }
}

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;

import java.util.Collections;
import java.util.List;

/**
 *  ListState<T> :这个状态为每一个 key 保存集合的值
 *      get() 获取状态值
 *      add() / addAll() 更新状态值,将数据放到状态中
 *      clear() 清除状态
 */
public class CountAverageWithListState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    /**
     * ValueState : 里面只能存一条元素
     * ListState : 里面可以存很多数据
     */
    private ListState<Tuple2<Long, Long>> elementsByKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ListStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ListStateDescriptor<Tuple2<Long, Long>>(
                        "average",  // 状态的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
        elementsByKey = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {
        // 拿到当前的 key 的状态值
        Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();

        // 如果状态值还没有初始化,则初始化
        if (currentState == null) {
            elementsByKey.addAll(Collections.emptyList());
        }

        // 更新状态
        elementsByKey.add(element);


        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());

        if (allElements.size() == 3) {
            long count = 0;
            long sum = 0;
            for (Tuple2<Long, Long> ele : allElements) {
                count++;
                sum += ele.f1;
            }
            double avg = (double) sum / count;
            out.collect(Tuple2.of(element.f0, avg));
            // 清除状态
            elementsByKey.clear();
        }
    }
}

MapState

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.UUID;

/**
 *  MapState<K, V> :这个状态为每一个 key 保存一个 Map 集合
 *      put() 将对应的 key 的键值对放到状态中
 *      values() 拿到 MapState 中所有的 value
 *      clear() 清除状态
 */
public class CountAverageWithMapState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值
    /**
     * MapState:
     *      Map集合的特点,相同key,会覆盖数据。
     */
    private MapState<String, Long> mapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        MapStateDescriptor<String, Long> descriptor =
                new MapStateDescriptor<String, Long>(
                        "average",  // 状态的名字
                        String.class, Long.class); // 状态存储的数据类型
        mapState = getRuntimeContext().getMapState(descriptor);
    }

    /**
     *
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {

        mapState.put(UUID.randomUUID().toString(), element.f1); //list

        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        List<Long> allElements = Lists.newArrayList(mapState.values());

        if (allElements.size() == 3) {
            long count = 0;
            long sum = 0;
            for (Long ele : allElements) {
                count++;
                sum += ele;
            }
            double avg = (double) sum / count;
            //
            out.collect(Tuple2.of(element.f0, avg));
            // 清除状态
            mapState.clear();

        }
    }
}

总结

  1. 是否存在当前处理的 key(current key):operator state 是没有当前 key 的概念,而 keyed
    state 的数值总是与一个 current key 对应。
  2. 存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现;而 keyed state
    backend 有 on-heap 和 off-heap(RocksDB)的多种实现。
  3. 是否需要手动声明快照(snapshot)和恢复 (restore) 方法:operator state 需要手动实现
    snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。
  4. 数据大小:一般而言,我们认为 operator state 的数据规模是比较小的;认为 keyed state 规模是
    相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。
    更多内容和代码示例请下载Flink State体系剖析以及案例实践

到了这里,关于Flink State 状态管理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Flink State backend状态后端

    Flink在v1.12到v1.14的改进当中,其状态后端也发生了变化。老版本的状态后端有三个,分别是MemoryStateBackend、FsStateBackend、RocksDBStateBackend,在flink1.14中,这些状态已经被废弃了,新版本的状态后端是 HashMapStateBackend、EmbeddedRocksDBStateBackend。 有状态流应用中的检查点(checkpoint),

    2024年01月25日
    浏览(14)
  • Flink理论—容错之状态后端(State Backends)

    Flink理论—容错之状态后端(State Backends)

    Flink 使用流重放 和 检查点 的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,同时保持一致性 容错机制不断地绘制分布式流数据流的快照。对于小状态的流式应用程

    2024年02月20日
    浏览(12)
  • Flink状态详解:什么是时状态(state)?状态描述(StateDescriptor)及其重要性

    Flink状态详解:什么是时状态(state)?状态描述(StateDescriptor)及其重要性

    了解Flink中的状态概念及其在流处理中的重要性。探讨状态在有状态计算中的作用,状态描述符(StateDescriptor)的基本概念和用法。理解状态在Flink任务中的维护、恢复和与算子的关联。

    2024年02月08日
    浏览(11)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

    【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月22日
    浏览(12)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(11)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(13)
  • flink 的 State

    flink 的 State

    目录 一、前言 二、什么是State 2.1:什么时候需要历史数据 2.2:为什么要容错,以及checkpoint如何进行容错 2.3:state basckend 又是什么 三、有哪些常见的是 State 四、 State的使用 五、State backend 5.1  MemoryStateBackend: 5.2  FsStatebackend: 5.3  RocksDBStateBackend: 六、Checkpoint 七、 Deep

    2023年04月18日
    浏览(7)
  • flink学习之state

    state作用 保留当前key的历史状态。 state用法 ListStateInteger vipList = getRuntimeContext().getListState(new ListStateDescriptorInteger(\\\"vipList\\\", TypeInformation.of(Integer.class))); 有valueState listState mapstate 。冒失没有setstate state案例 比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小

    2024年02月09日
    浏览(7)
  • Flink_state 的优化与 remote_state 的探索

    Flink_state 的优化与 remote_state 的探索

    摘要:本文整理自 bilibili 资深开发工程师张杨,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容主要分为四个部分: 相关背景 state 压缩优化 Remote state 探索 未来规划 点击查看原文视频 演讲PPT 1.1 业务概况 从业务规模来讲,B 站目前大约是 4000+的 Flink 任务,其中 95%是

    2024年02月11日
    浏览(6)
  • Flink源码之State创建流程

    Flink源码之State创建流程

    StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateH

    2024年02月12日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包