【API篇】八、Flink窗口函数

这篇具有很好参考价值的文章主要介绍了【API篇】八、Flink窗口函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

//窗口操作
stream.keyBy(<key selector>)
       .window(<window assigner>)
       .aggregate(<window function>)

上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

  • 增量聚合来一条数据,计算一条数据,窗口触发的时候输出计算结果
  • 全窗口函数数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果

1、增量聚合之ReduceFunction

public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        env.socketTextStream("node01", 9527)
           .map(new WaterSensorMapFunction())
           .keyBy(r -> r.getId())
           // 设置滚动事件时间窗口
           .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
           .reduce(new ReduceFunction<WaterSensor>() {

               @Override
               public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                   System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);
                   return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());
               }
           })
           .print();

        env.execute();
    }
}

运行,输入数据,查看控制台:

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

2、增量聚合之AggregateFunction

上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:

  • createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add:将输入的元素添加到累加器中。
  • getResult:从累加器中提取聚合的输出结果。
  • merge:合并两个累加器,并将合并后的状态作为一个累加器返回

AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果

public class WindowAggregateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());    //自定义的实现类,String转自定义对象WaterSensor


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> aggregate = sensorWS
                .aggregate(
                        new AggregateFunction<WaterSensor, Integer, String>() {
                            @Override
                            public Integer createAccumulator() {
                                System.out.println("创建累加器");
                                return 0;
                            }
							
							//value即输入的数据,accumulator即之前的计算结果
                            @Override
                            public Integer add(WaterSensor value, Integer accumulator) {
                                System.out.println("调用add方法,value="+value);
                                return accumulator + value.getVc();
                            }

                            @Override
                            public String getResult(Integer accumulator) {
                                System.out.println("调用getResult方法");
                                return accumulator.toString();
                            }

                            @Override
                            public Integer merge(Integer a, Integer b) {
                                System.out.println("调用merge方法");
                                return null;
                            }
                        }
                );
        
        aggregate.print();

        env.execute();
    }
}

运行,输入数据,查看控制台:

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

3、全窗口函数full window functions

全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:

stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                        	/**
                        	* 全窗口函数计算逻辑,窗口结束时触发才调用一次
                        	* s 分组的key
                        	* context 上下文对象
                        	* elements 窗口内存的所有数据
                        	* out 采集器对象
                        	*/
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long count = elements.spliterator().estimateSize();
                                long windowStartTs = context.window().getStart();
                                long windowEndTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();

        env.execute();
    }
}

效果:

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

4、增量聚合函数搭配全窗口函数

可以看出,增量和全窗口各有好处:

  • 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
  • 全窗口函数则是可以通过上下文对象来实现灵活的功能

像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) 

// ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)

// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)

// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<TACCV> aggFunction,
        ProcessWindowFunction<VRKW> windowFunction)

此时:

  • 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
  • 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
  • 经过全窗口,执行处理和包装,再输出
public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

		//sensorWS.reduce()   //也可以传两个

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );

        result.print();

        env.execute();
    }
    
}

public  class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }


        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add方法,value="+value);
            return accumulator + value.getVc();
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用getResult方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            System.out.println("调用merge方法");
            return null;
        }
    }

// 全窗口函数的输入类型 = 增量聚合函数的输出类型
public  class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

    @Override
    public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
        long startTs = context.window().getStart();
        long endTs = context.window().getEnd();
        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

        long count = elements.spliterator().estimateSize();

        out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

    }
}

注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型

5、会话窗口动态获取间隔值

到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:

  • 时间滚动窗口
  • 时间滑动窗口
  • 时间会话窗口
  • 计数滚动窗口
  • 计数滑动窗口

这里再记录下时间会话窗口+动态获取会话间隔:

public class WindowSessionDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());
                
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
           
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long count = elements.spliterator().estimateSize();
                                long windowStartTs = context.window().getStart();
                                long windowEndTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();

        env.execute();
    }
}

来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

再贴个计数滑动窗口:

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

6、触发器和移除器

触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数

//基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...)
       .window(...)
       .trigger(new MyTrigger())

移除器主要用来定义移除某些数据的逻辑

基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)
       .window(...)
       .evictor(new MyEvictor())

Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。

7、补充

窗口的划分:

  • 窗口开始时间start是窗口长度的整数倍,向下取整

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口
【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

  • 窗口结束时间是start+窗口长度

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口
【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口

  • 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1

【API篇】八、Flink窗口函数,Flink,1024程序员节,flink,API,窗口文章来源地址https://www.toymoban.com/news/detail-720037.html

  • 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
  • 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
  • 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)

到了这里,关于【API篇】八、Flink窗口函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 1024程序员节特辑:【Spring Boot自动配置原理揭秘】

    主页传送门:📀 传送   Spring Boot 是一个用于创建独立的、生产级别的 Spring 应用程序的框架。它极大地简化了 Spring 应用程序的开发过程,其中一个关键的功能就是自动配置(Auto-Configuration)。   自动配置可以根据项目需求自动配置各种服务和组件,它可以帮助开发者

    2024年02月08日
    浏览(66)
  • 解决github ping不通的问题(1024程序员节快乐!

    1024程序员节快乐!( 随便粘贴一个文档,参加活动 域名解析(域名-IP):https://www.ipaddress.com/ Ubuntu平台 github经常ping不通或者访问缓慢,方法是更改hosts文件 在hosts里添加github的ip 140.82.114.4 www.github.com 199.232.5.194 github.global.ssl.fastly.net 54.231.114.219 github-cloud.s3.amazonaws.com 可以访

    2024年01月18日
    浏览(77)
  • 好用且免费的CodeWhisperer,给1024程序员节送礼来了

          国庆期间没有胆量去人从众的景点,关在家里刷手机时意外在亚马逊的User Group公众号上发现了CodeWhisperer这么个好东西(bu yao qian),以后撸代码也可以提高生产力(fang yang mo yu)了,这还不赶紧上手试一下。看官方介绍说它支持流行的IDE开发工具,包括VS Code、Intelli

    2024年02月08日
    浏览(47)
  • 1024程序员节带你玩转图片Exif信息获取之JavaScript

    目录 一、前言 二、背景 三、Exif.js          1、Exif.js 简介 2、Exif.js 引入 四、多场景展示数据获取 1、原始图片直接获取  2、base64 编码文件加载  3、文件上传的方式加载  五、总结        1024是2的十次方,二进制计数的基本计量单位之一。1G=1024M,而1G与1级谐音,也有一

    2024年02月20日
    浏览(56)
  • 1024程序员节特辑 | Spring Boot实战 之 MongoDB分片或复制集操作

    Spring实战系列文章: Spring实战 | Spring AOP核心秘笈之葵花宝典 Spring实战 | Spring IOC不能说的秘密? 国庆中秋特辑系列文章: 国庆中秋特辑(八)Spring Boot项目如何使用JPA 国庆中秋特辑(七)Java软件工程师常见20道编程面试题 国庆中秋特辑(六)大学生常见30道宝藏编程面试题

    2024年02月08日
    浏览(76)
  • 1024程序员节特辑 | ELK+ 用户画像构建个性化推荐引擎,智能实现“千人千面”

    专栏集锦,大佬们可以收藏以备不时之需 Spring Cloud实战专栏:https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏:https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏:https://blog.csdn.net/superdangbo/category_9271502.html tensorflow专栏:https://blog.csdn.net/superdangbo/category_869

    2024年02月07日
    浏览(80)
  • 1024程序员狂欢节 | IT前沿技术、人工智能、数据挖掘、网络空间安全技术

    一年一度的1024程序员狂欢节又到啦!成为更卓越的自己,坚持阅读和学习,别给自己留遗憾,行动起来吧! 那么,都有哪些好书值得入手呢?小编为大家整理了前沿技术、人工智能、集成电路科学与芯片技术、新一代信息与通信技术、网络空间安全技术,四大热点领域近期

    2024年02月06日
    浏览(64)
  • 1024程序员节特辑 | 解密Spring Cloud Hystrix熔断提高系统的可用性和容错能力

    专栏集锦,大佬们可以收藏以备不时之需 Spring Cloud实战专栏:https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏:https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏:https://blog.csdn.net/superdangbo/category_9271502.html tensorflow专栏:https://blog.csdn.net/superdangbo/category_869

    2024年02月08日
    浏览(50)
  • 1024程序员节?我们整点AI绘图玩玩吧,一文教你配置stable-diffusion

    需提前准备:一台高性能的电脑(尤其是显存)、python、Git、梯子。 其实Github上有很多关于Stable diffusion的库,综合对比之后,我选取的是比较全面的AUTOMATIC1111这个,源码链接:Stable-diffusion(Github) 找到安装那块的教程,此教程以windows为例。 ps:如果你电脑上已经有了pyt

    2024年01月16日
    浏览(71)
  • PHP框架开发实践 | 1024 程序员节:通过index.php找到对应的controller是如何实现的

    🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌,CSDN博客专家,阿里云社区专家博主,2023年6月CSDN上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 🏆本文已收录于PHP专栏:PHP进阶实战教程。 🎉欢迎 👍点赞✍评论⭐收藏

    2024年02月08日
    浏览(67)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包