flink的ProcessWindowFunction函数的三种状态

这篇具有很好参考价值的文章主要介绍了flink的ProcessWindowFunction函数的三种状态。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState,
每个key+每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢?

ProcessWindowFunction处理函数三种状态之间的关系:

1.getRuntimeContext.getState这个定义的状态是每个key维度的,也就是可以跨时间窗口并维持状态的
2.context.windowState()这个定义的状态是和每个key以及窗口相关的,也就是虽然key相同,但是时间窗口不同,他们的值也不一样.
3.context.globalState这个定义的状态是和每个key相关的,也就是和getRuntimeContext.getState的定义一样,可以跨窗口维护状态
验证代码如下所示:

package wikiedits.func;


import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.util.Collector;
import wikiedits.func.model.KeyCount;

import java.text.SimpleDateFormat;

import java.util.Date;



public class ProcessWindowFunctionDemo {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        // 并行度为1
        env.setParallelism(1);
        // 设置数据源,一共三个元素
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                int xxxNum = 0;
                int yyyNum = 0;
                for (int i = 1; i < Integer.MAX_VALUE; i++) {
                    // 只有XXX和YYY两种name
                    String name = (0 == i % 2) ? "XXX" : "YYY";
                    //更新aaa和bbb元素的总数
                    if (0 == i % 2) {
                        xxxNum++;
                    } else {
                        yyyNum++;
                    }
                    // 使用当前时间作为时间戳
                    long timeStamp = System.currentTimeMillis();
                    // 将数据和时间戳打印出来,用来验证数据
                    System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n",
                            name,
                            time(timeStamp),
                            xxxNum,
                            yyyNum));
                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
                    // 每发射一次就延时1秒
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
            }
        });

        // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
                .keyBy(value -> value.f0)
                // 5秒一次的滚动窗口
                .timeWindow(Time.seconds(5))
                // 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                    // 自定义状态
                    private ValueState<KeyCount> state;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 初始化状态,name是myState
                        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                    }

                    public void clear(Context context){
                        ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                        contextWindowValueState.clear();
                    }

                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,
                            Collector<String> collector) throws Exception {
                        // 从backend取得当前单词的myState状态
                        KeyCount current = state.value();
                        // 如果myState还从未没有赋值过,就在此初始化
                        if (current == null) {
                            current = new KeyCount();
                            current.key = s;
                            current.count = 0;
                        }
                        int count = 0;
                        // iterable可以访问该key当前窗口内的所有数据,
                        // 这里简单处理,只统计了元素数量
                        for (Tuple2<String, Integer> tuple2 : iterable) {
                            count++;
                        }
                        // 更新当前key的元素总数
                        current.count += count;
                        // 更新状态到backend
                        state.update(current);
                        System.out.println("getRuntimeContext() == context :" + (getRuntimeContext() == context));
                        ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                        ValueState<KeyCount> contextGlobalValueState = context.globalState().getState(new ValueStateDescriptor<>("myGlobalState", KeyCount.class));
                        KeyCount windowValue = contextWindowValueState.value();
                        if (windowValue == null) {
                            windowValue = new KeyCount();
                            windowValue.key = s;
                            windowValue.count = 0;
                        }
                        windowValue.count += count;
                        contextWindowValueState.update(windowValue);

                        KeyCount globalValue = contextGlobalValueState.value();
                        if (globalValue == null) {
                            globalValue = new KeyCount();
                            globalValue.key = s;
                            globalValue.count = 0;
                        }
                        globalValue.count += count;
                        contextGlobalValueState.update(globalValue);

                        ValueState<KeyCount> contextWindowSameNameState =
                                context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                        ValueState<KeyCount> contextGlobalSameNameState =
                                context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                        System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (
                                contextWindowSameNameState == contextGlobalSameNameState));
                        System.out.println(
                                "state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));

                        // 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串
                        String value = String.format("window, %s, %s - %s, %d,    total : %d, windowStateCount :%s, globalStateCount :%s\n",
                                // 当前key
                                s,
                                // 当前窗口的起始时间
                                time(context.window().getStart()),
                                // 当前窗口的结束时间
                                time(context.window().getEnd()),
                                // 当前key在当前窗口内元素总数
                                count,
                                // 当前key出现的总数
                                current.count,
                                contextWindowValueState.value(),
                                contextGlobalValueState.value());

                        // 发射到下游算子
                        collector.collect(value);
                    }
                });

        // 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
        mainDataStream.print();

        env.execute("processfunction demo : processwindowfunction");

    }



    public static String time(long timeStamp) {
        return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp));
    }




}

输出结果:

window, XXX, 08:34:45 - 08:34:50, 3,    total : 22, windowStateCount :KeyCount{key='XXX', count=3}, globalStateCount :KeyCount{key='XXX', count=22}
window, YYY, 08:34:45 - 08:34:50, 2,    total : 22, windowStateCount :KeyCount{key='YYY', count=2}, globalStateCount :KeyCount{key='YYY', count=22}

从结果可以验证以上的结论,此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉,因为一旦时间窗口结束,就再也没有机会清理了
从这个例子中还发现一个比较有趣的现象:

ValueState<KeyCount> state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextWindowSameNameState =
        context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextGlobalSameNameState =
        context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));

在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到,并且他们指向的都是同一个变量,可以参见代码的输出:

System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (
        contextWindowSameNameState == contextGlobalSameNameState));
System.out.println(
        "state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));

结果如下:

contextWindowSameNameState == contextGlobalSameNameState :true
state == contextGlobalSameNameState :true

参考文献:
https://cloud.tencent.com/developer/article/1815079文章来源地址https://www.toymoban.com/news/detail-641239.html

到了这里,关于flink的ProcessWindowFunction函数的三种状态的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Verilog写状态机的三种描述方式之三段式

    状态机的设计思路: 一是从状态机变量入手,分析各个状态的输入、状态转移和输出; 二是先确定电路的输出关系,再回溯规划每个状态的条件、输入等; 状态机的三要素是状态、输入和输出 , 根据状态机状态是否和输入条件相关,可以分为Moore型状态机(与输入无关)和

    2024年02月14日
    浏览(33)
  • C语言中函数宏的三种封装方式详解

      目录 ​编辑 1. 函数宏介绍 3. do{...}while(0) 方式 4. ({}) 方式 5. 总结 函数宏,即包含多条语句的宏定义,其通常为某一被频繁调用的功能的语句封装,且不想通过函数方式封装来降低额外的弹栈压栈开销。 函数宏本质上为宏,可以直接进行定义,例如: 但上述的宏具有一

    2024年02月02日
    浏览(39)
  • C++基础与深度解析01——函数基本组成+函数传参的三种方法

    请安装Visual Studio 并学习基本的新建项目、新建CPP文件以及运行代码。 函数功能:打印“Hello World” 1.函数构成 输入参数列表+函数名+返回类型+函数主体 如下图所示,但是main函数稍有特殊,其为cpp 现举一个简单函数的例子,z= 2x-3y+3,其中xy均为浮点型小数,取函数名为CalBi

    2024年02月16日
    浏览(34)
  • 计算字符串长度的三种方法(库函数 指针 )【详解】

    求字符串长度简单来说就是计算一个字符串(字符数组)中元素的个数即从数组头部计数,直到遇到字符串’\\0’结束符为止, 计数结果不包括’\\0’. C语言中的库函数strlen,它包含于string.h中,因此我们需要在使用前添加头文件 ,具体用法如下: strlen从数组头部计数,直到遇到字

    2024年02月06日
    浏览(58)
  • C/C++中关于交换(Swap)函数的三种方法

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 文章目录 前言 一、交换函数的说明 二、三种情况的说明 1.值传递 1.1值传递的运行结果  2.传址调用 运行结果: 3.引用作为函数参数 运行结果:  对于引用变量的说明 总结 在学习编程中,交换函

    2024年02月11日
    浏览(25)
  • 一文读懂WPT系统中耦合的三种状态——过耦合、临界耦合、欠耦合时频率分裂对传输效率与功率的影响

    前言:本文章属于菜鸡 学 习文 章,不代表一定权威性,如有错误,请各位大佬评论区指正! 主要对改论文进行学习以及加入自己的一些想法,还希望读者发现错误即使指出。 目录 一、对于无线电能传输功率的分析 二、对于无线传能拓扑效率以及功率极值的计算 ①频率分

    2024年02月02日
    浏览(27)
  • stm32F103C8T6的三种延时函数

    非精准延时的方式就是使用空循环,循环内容为空。 优点是无需配置定时器,直接就能拿来使用。 缺点也很明显,就是无法实现精准延时,只能估摸着个大概,并且会造成CPU空转,不如使用硬件的方式。 以下是以TIM3为例: 初始化步骤与GPIO引脚使能一样,都是先定义一个初

    2024年02月10日
    浏览(40)
  • 【Apache Flink】实现有状态函数

    Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括: ValueState : 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。 ListState : 这种状态用于存储一

    2024年02月08日
    浏览(27)
  • OpenCV函数应用:基于二值图像的三种孔洞填充方法记录(附python,C++代码)

    函数系列: OpenCV函数简记_第一章数字图像的基本概念(邻域,连通,色彩空间) OpenCV函数简记_第二章数字图像的基本操作(图像读写,图像像素获取,图像ROI获取,图像混合,图形绘制) OpenCV函数简记_第三章数字图像的滤波处理(方框,均值,高斯,中值和双边滤波) OpenC

    2024年02月05日
    浏览(41)
  • 数据传输的三种方式

    在通信和计算机网络中,从通信资源的分配角度来看,“交换”就是按照某种方式动态地分配传输线路的资源。常用的数据传输方式有电路交换、报文交换、分组交换。 特点: 通信双方独占通信链路 优点: 数据传输时延小, 适用于实时通信 ;数据按序发送,不存在失序问

    2024年02月08日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包