Flink 算子状态

这篇具有很好参考价值的文章主要介绍了Flink 算子状态。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

算子状态 (Operator State) : 一个算子并行实例上定义的状态,作用范围 : 当前算子任务

算子状态支持三种结构类型 : ListState、UnionListState、BroadcastState

列表状态

ListState : 将状态表示为列表 , 当前并行子任务上的所有状态项的集合

  • 当算子并行度调整时,会把算子的列表状态都收集起来,再均匀分配 (轮询) 给所有并行任务

例子 : map 计算数据的个数

public class OperatorListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env.socketTextStream("hadoop102", 7777)
            .map(new MyCountMapFunction())
            .print();

        env.execute();
    }

    
    // 1.实现 CheckpointedFunction 接口
    public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
        private Long count = 0L;
        private ListState<Long> state;

        @Override
        public Long map(String value) throws Exception {
            return ++count;
        }

		// 2.本地变量持久化:将本地变量, 拷贝到算子状态中, 开启 checkpoint 时, 才会调用
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshotState...");
            
            // 2.1 清空算子状态
            state.clear();
            // 2.2 将 本地变量 添加到 算子状态 中
            state.add(count);
        }

        // 3.初始化本地变量:程序启动和恢复时, 从状态中, 把数据添加到本地变量,每个子任务调用一次
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initializeState...");
            
            // 3.1 从上下文初始化算子状态
            state = context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Long>("state", Types.LONG));

            // 3.2 从算子状态中, 把数据拷贝到本地变量
            if (context.isRestored()) {
                for (Long c : state.get()) {
                    count += c;
                }
            }
        }
    }
}

联合列表状态

联合列表状态 (UnionListState) : 将状态表示为一个列表

  • 并行度调整时 : 常规列表状态是轮询分配状态项 ; 联合列表状态算子会直接广播状态的完整列表
  • 注意 : 当列表大 , 会有效率问题
state = context.getOperatorStateStore()
    .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

广播状态

广播状态 (BroadcastState) : 当所有分区的所有数据都访问同个状态

  • 当并行度调整时 , 复制到新并行子任务 ; 删除多余并行子任务

例子 : 水位 > 阈值, 就发送告警文章来源地址https://www.toymoban.com/news/detail-496671.html

public class OperatorBroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 数据流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        // 配置流(用来广播配置)
        DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);

        // 1. 将 配置流 广播
        MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);

        //2.把 数据流 和 广播后的配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);

        // 3.调用 process
        sensorBCS.process(
            new BroadcastProcessFunction<WaterSensor, String, String>() {

                // 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改
                @Override
                public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                    // 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)
                    ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                    Integer threshold = broadcastState.get("threshold");
                    
                    // 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来
                    threshold = (threshold == null ? 0 : threshold);
                    if (value.getVc() > threshold) {
                        out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
                    }

                }

                // 广播后的配置流的处理方法: 只有广播流才能修改 广播状态
                @Override
                public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                    // 4. 通过上下文获取广播状态,往里面写数据
                    BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                    broadcastState.put("threshold", Integer.valueOf(value));

                }
            }
		).print();

        env.execute();
    }
}

到了这里,关于Flink 算子状态的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink State 状态原理解析

    State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。 Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBac

    2024年02月05日
    浏览(40)
  • 【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理

    按照数据的划分和扩张方式,Flink中大致分为2类: Keyed States:记录每个Key对应的状态值 因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)   常

    2024年02月01日
    浏览(55)
  • Flink 算子状态

    算子状态 (Operator State) : 一个算子并行实例上定义的状态,作用范围 : 当前算子任务 算子状态支持三种结构类型 : ListState、UnionListState、BroadcastState ListState : 将状态表示为列表 , 当前并行子任务上的所有状态项的集合 当算子并行度调整时,会把算子的列表状态都收集起来,再

    2024年02月10日
    浏览(40)
  • Flink State backend状态后端

    Flink在v1.12到v1.14的改进当中,其状态后端也发生了变化。老版本的状态后端有三个,分别是MemoryStateBackend、FsStateBackend、RocksDBStateBackend,在flink1.14中,这些状态已经被废弃了,新版本的状态后端是 HashMapStateBackend、EmbeddedRocksDBStateBackend。 有状态流应用中的检查点(checkpoint),

    2024年01月25日
    浏览(41)
  • Flink状态详解:什么是时状态(state)?状态描述(StateDescriptor)及其重要性

    了解Flink中的状态概念及其在流处理中的重要性。探讨状态在有状态计算中的作用,状态描述符(StateDescriptor)的基本概念和用法。理解状态在Flink任务中的维护、恢复和与算子的关联。

    2024年02月08日
    浏览(44)
  • Flink理论—容错之状态后端(State Backends)

    Flink 使用流重放 和 检查点 的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,同时保持一致性 容错机制不断地绘制分布式流数据流的快照。对于小状态的流式应用程

    2024年02月20日
    浏览(37)
  • flink state原理,TTL,状态后端,数据倾斜一文全

    拿五个字做比喻:“铁锅炖大鹅”,铁锅是状态后端,大鹅是状态,Checkpoint 是炖的动作。 状态 :本质来说就是数据,在 Flink 中,其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapState,ValueState,ListState。 状态后端 :Flink 提供的用于管理状态的组件,状态后端决

    2024年02月22日
    浏览(49)
  • 209.Flink(四):状态,按键分区,算子状态,状态后端。容错机制,检查点,保存点。状态一致性。flink与kafka整合

    算子任务可以分为有状态、无状态两种。 无状态:filter,map这种,每次都是独立事件 有状态:sum这种,每次处理数据需要额外一个状态值来辅助。这个额外的值就叫“状态” (1)托管状态(Managed State)和原始状态(Raw State) 托管状态 就是由Flink统一管理的,状态的存储访问

    2024年02月06日
    浏览(51)
  • 从Flink的Kafka消费者看算子联合列表状态的使用

    算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主要由这两个方法处理: 1初

    2024年02月08日
    浏览(37)
  • 【flink番外篇】10、对有状态或及时 UDF 和自定义算子进行单元测试

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月02日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包