Flink窗口分类简介及示例代码

这篇具有很好参考价值的文章主要介绍了Flink窗口分类简介及示例代码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

水善利万物而不争,处众人之所恶,故几于道💦

1. 流式计算

  Flink作为一个流式处理引擎,被设计用来处理无限数据集,理论上来说,无限数据集是一种不断产生,源源不断的数据集,说白了就是你不知道这个数据流它啥时候结束,这就是无限数据集。

  流式计算的思想是每来一个数据我就直接处理,而不用等,因此他非常适合在实时性要求比较高的场景下使用。

2. 窗口

  在流处理的场景下,如果我们想要统计过去某个时间段或过去多少条数据的指标时,就需要用到窗口,在Flink中,窗口(window)可以将流划分为有限块进行处理,Flink将这些有限的块抽象为“存储桶(bucket)”,我们可以在这些所谓的桶上做计算,也就实现了无限数据的有限计算。

  窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

  窗口的声明周期是:一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness (可容忍的迟到时间)”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口(Global Windows)。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。关于窗口的详细介绍查看->官方对于窗口的介绍

3. 窗口的分类

◆ 基于时间的窗口(时间驱动)
1) 滚动窗口(Tumbling Windows)

window(TumblingProcessingTimeWindows.of(Time.seconds(10))),参数是时间滚动窗口大小。10秒滚动一个窗口

  滚动窗口将元素分发到指定大小的窗口。滚动窗口的大小是固定的,且个窗口之间没有空隙,不会重叠。比如说,如果你指定了滚动窗口的大小为5分钟,那么每5分钟就会有一个窗口被计算,且一个新的窗口被创建。如下图所示:
Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口
示例代码:

public class Flink01_Window_Time {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义一个长度为10s的滚动窗口 每隔10s滚动一次
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(
                        // 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型
                        new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {
                            // 在窗口关闭的时候触发一次
                            @Override
                            public void process(String s, // key ,keyBy之后的key
                                                Context context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...
                                                Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素
                                                Collector<Object> out) throws Exception {
                                // 把Iterable中所有的元素取出并存入到 list 集合中
                                List<WaterSensor> list = AnqclnUtil.toList(elements);
                                // 获取窗口的相关信息,窗口开始时间和结束时间
                                String startTime = AnqclnUtil.toDateTime(context.window().getStart());
                                String endTime = AnqclnUtil.toDateTime(context.window().getEnd());

                                out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);
                            }
                        }
                )
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

自定义的工具类:

public class AnqclnUtil {
    // 要先声明泛型
    public static <T>List<T> toList(Iterable<T> elements) {
        List<T> list = new ArrayList<>();

        for (T t : elements) {
            list.add(t);
        }
        return list;
    }
	// 将long类型的时间转换为时间字符串
    public static String toDateTime(long ts) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);
    }
}

运行结果:

Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口

2) 滑动窗口(Sliding Windows)

window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))),参数是时间滑动窗口大小和滑动距离。5秒滑动一个窗口,每个窗口最多放10个元素

  与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

  比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口

示例代码:

public class Flink01_Window_Time {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .process(
                        // 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型
                        new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {
                            // 在窗口关闭的时候触发一次
                            @Override
                            public void process(String s, // key ,keyBy之后的key
                                                Context context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...
                                                Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素
                                                Collector<Object> out) throws Exception {
                                // 把Iterable中所有的元素取出并存入到 list 集合中
                                List<WaterSensor> list = AnqclnUtil.toList(elements);
                                // 获取窗口的相关信息,窗口开始时间和结束时间
                                String startTime = AnqclnUtil.toDateTime(context.window().getStart());
                                String endTime = AnqclnUtil.toDateTime(context.window().getEnd());

                                out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);
                            }
                        }
                )
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口

3) 会话窗口(Session Windows)

window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))),参数是会话间隔,也就是多久没有活跃就关闭当前会话。4秒不活跃就关闭窗口。

  会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口

示例代码:

public class Flink01_Window_Time {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                // 定义一个session窗口,时间间隔为4s  对于session窗口来说,不同的key出发时间不同,每个key都维护自己的session
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
                .process(
                        // 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型
                        new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {
                            // 在窗口关闭的时候触发一次
                            @Override
                            public void process(String s, // key ,keyBy之后的key
                                                Context context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...
                                                Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素
                                                Collector<Object> out) throws Exception {
                                // 把Iterable中所有的元素取出并存入到 list 集合中
                                List<WaterSensor> list = AnqclnUtil.toList(elements);
                                // 获取窗口的相关信息,窗口开始时间和结束时间
                                String startTime = AnqclnUtil.toDateTime(context.window().getStart());
                                String endTime = AnqclnUtil.toDateTime(context.window().getEnd());

                                out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);
                            }
                        }
                )
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口

◆ 基于元素个数的(数据驱动)
1) 滚动窗口(Tumbling Windows)

countWindow(3),参数是个数滚动窗口大小。3个元素滚动一个窗口

  每来多少个元素就滚动一次

示例代码:

public class Flink02_Window_Count {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line -> {
                    String[] data = line.split(",");

                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
                .countWindow(3)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String s,
                                        Context context,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list = AnqclnUtil.toList(elements);

                        out.collect(" key: "+s+" "+list);
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口

2) 滑动窗口(Sliding Windows)

countWindow(3,2),参数是个数滑动窗口大小和滑动步长。每两个元素产生一个新的窗口,每个窗口最多放3个元素。

  就比滚动的多了个参数,滑动步长。步长是生成新窗口的条件,而窗口大小是指这个窗口最多能放多少个元素

示例代码:

public class Flink02_Window_Count {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101",9999)
                .map(line -> {
                    String[] data = line.split(",");

                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
//                .countWindow(3)
                // 定义一个长度为3(窗口内元素的最大个数)  每来两个2个元素滑动一次,
                .countWindow(3,2)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String s,
                                        Context context,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list = AnqclnUtil.toList(elements);

                        out.collect(" key: "+s+" "+list);
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
Flink窗口分类简介及示例代码,Flink,flink,大数据,窗口,流式计算,滚动窗口,滑动窗口,会话窗口文章来源地址https://www.toymoban.com/news/detail-645398.html

到了这里,关于Flink窗口分类简介及示例代码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink + MySQL 流式计算数据分析

    作者:禅与计算机程序设计艺术 大数据时代,海量的数据源源不断涌入到互联网、移动应用、企业数据库等各个领域,同时这些数据也逐渐成为各种业务场景中的主要输入数据。如何在短时间内对海量数据进行处理、分析并得出有价值的信息,已经成为当今社会越来越关注的

    2024年02月06日
    浏览(51)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(43)
  • Flink的流式数据处理与时间序列分析

    Apache Flink 是一个流处理框架,用于实时数据处理和分析。它支持大规模数据流处理,具有高吞吐量和低延迟。Flink 可以处理各种数据源和数据接收器,如 Kafka、HDFS、TCP 流等。 时间序列分析是一种用于分析时间序列数据的方法,用于发现数据中的趋势、季节性和随机性。时间

    2024年02月21日
    浏览(50)
  • 【Apache-Flink零基础入门】「入门到精通系列」手把手+零基础带你玩转大数据流式处理引擎Flink(基础概念解析+有状态的流式处理)

    Apache Flink 是业界公认的最佳流计算引擎之一,它不仅仅局限于流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎。Flink 的用户只需根据业务逻辑开发一套代码,就能够处理全量数据、增量数据和实时数据,无需针对不同的数据类型开发不同的方案。这使得

    2024年02月03日
    浏览(88)
  • 示例代码:使用golang进行flink开发

    以下是一个使用 Golang 进行 Flink 开发的简单示例代码: 以上示例代码使用 Flink 的 REST API 连接到 Flink 作业集群,并定义了一个输入数据流和一个输出数据流。然后,使用 Map 操作对输入数据进行处理,并将处理后的数据写入输出数据流。最后,执行作业并等待作业结束。 请注

    2024年02月12日
    浏览(44)
  • 示例代码:使用python进行flink开发

    以下是一个使用 Python 进行 Flink 开发的简单示例代码: 以上示例代码使用 PyFlink 库连接到 Flink 作业集群,并定义了一个输入流和一个输出流。然后,使用 UDF (User Defined Function)对输入数据进行处理,并将处理后的数据写入输出流。最后,执行作业并等待作业结束。 请注意

    2024年02月13日
    浏览(51)
  • 【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同

    Flink版本: 本文主要是基于Flink1.14.4 版本 导言: Apache Flink 作为流式处理领域的先锋,为实时数据处理提供了强大而灵活的解决方案。其中,KafkaSink 是 Flink 生态系统中的关键组件之一,扮演着将 Flink 处理的数据可靠地发送到 Kafka 主题的角色。本文将深入探讨 KafkaSink 的工作

    2024年02月20日
    浏览(62)
  • XL-LightHouse 与 Flink 和 ClickHouse 流式大数据统计系统

    一个Flink任务只能并行处理一个或少数几个数据流,而XL-LightHouse一个任务可以并行处理数万个、几十万个数据流; 一个Flink任务只能实现一个或少数几个数据指标,而XL-LightHouse单个任务就能支撑大批量、数以万计的数据指标。 1、XL-LightHouse :  1、再也不需要用 Flink、Spark、

    2024年02月09日
    浏览(38)
  • Flink流数据窗口与时间

    随着大数据时代的到来,流处理技术变得越来越重要。流处理系统可以实时地处理大量数据,为实时应用提供有价值的信息。Apache Flink是一个流处理框架,它可以处理大规模的流数据,并提供丰富的功能,如窗口操作、时间操作等。在本文中,我们将深入探讨Flink流数据窗口

    2024年02月20日
    浏览(93)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包