Flink 窗口

这篇具有很好参考价值的文章主要介绍了Flink 窗口。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

介绍:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段,其分为两种类型:1、时间窗口,2:计数窗口

一、时间窗口

时间窗口根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

1.1、滚动窗口(Tumbling Windows)

介绍:将数据依据固定的窗口长度(时间)对数据进行切片
特点:时间对齐,窗口长度固定,没有重叠

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 聚合(也可以使用别的算子进行聚合)
        SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
        );
        // 打印计算结果
        reduce.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


1.2、滑动窗口(Sliding Windows)

介绍:滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
特点:时间对齐,窗口长度固定,有重叠

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
//        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
//                .keyBy(Demo::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));

        // 聚合(也可以使用别的算子进行聚合)
        SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
        );
        // 打印计算结果
        reduce.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


1.3、会话窗口(Session Windows)

介绍:由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
特点:时间无对齐

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
//        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
//                .keyBy(Demo::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));


        // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据)
//        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
//                .keyBy(Demo::getId)
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));


        // 会话窗口(超时间隔5s)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));


        // 聚合(也可以使用别的算子进行聚合)
        SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
        );
        // 打印计算结果
        reduce.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


1.4、总结

滚动窗口:TumblingProcessingTimeWindows.of(Time.seconds(10))
滑动窗口:SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))
会话窗口:ProcessingTimeSessionWindows.withGap(Time.seconds(5))

二、计数窗口

和时间窗口类似,同样也分为三种,使用方法也基本相同

1.1、滚动窗口(Tumbling Windows)

窗口长度=5个元素

sensorKs.countWindow(5);

1.2、滑动窗口(Sliding Windows)

窗口长度=5个元素,滑动步长=2个元素

sensorKs.countWindow(5, 2);

1.3、会话窗口(Session Windows)

三、窗口触发方式

3.1、增量聚合

来一条数据,计算一条数据,窗口触发的时候输出计算结果
函数:reduce、aggregate等,除了process都是增量函数

3.2、全窗口函数

数据来了不计算,存储起来,窗口触发的时候,计算并输出结果,并且可以获取到窗口信息、上下文信息等,灵活性非常的强
函数:process文章来源地址https://www.toymoban.com/news/detail-701716.html

package com.xx.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
@Slf4j
public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
        SingleOutputStreamOperator<Demo> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new DemoMapFunction());

        // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
        WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                .keyBy(Demo::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<Demo, String, String, TimeWindow>() {

            /**
             * 全窗口函数的计算逻辑,窗口触发时才会调用一次,统一计算窗口的所有数据
             * @param s 分组的key
             * @param context 上下文
             * @param elements 存的数据
             * @param out 采集器
             */
            @Override
            public void process(String s, ProcessWindowFunction<Demo, String, String, TimeWindow>.Context context, Iterable<Demo> elements, Collector<String> out) {
                long start = context.window().getStart();
                long end = context.window().getEnd();

                String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements);
            }
        });

        // 打印计算结果
        process.print();
        // 触发计算
        env.execute();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Demo {

    private String id;

    private Long value;
}

class DemoMapFunction implements MapFunction<String, Demo> {

    @Override
    public Demo map(String value) {
        String[] datas = value.split(",");
        return new Demo(datas[0], Long.valueOf(datas[1]));
    }
}


3.3、增量函数和全窗口函数组合使用

package com.xx.window;

import com.xx.entity.WaterSensor;
import com.xx.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author aqi
 * @since 2023/8/30 15:46
 */
public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new WaterSensorMapFunction());


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                // 第一个参数:输入数据的类型,第二个参数:累加器的类型,存储的中间计算结果的类型,第三个参数:输出的类型
                new AggregateFunction<WaterSensor, Integer, String>() {
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("初始化累加器");
                        return null;
                    }

                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        if (accumulator == null) {
                            accumulator = 0;
                        }
                        Integer add = value.getVc() + accumulator;
                        System.out.println("调用add方法,累加结果:" + add);
                        return add;
                    }

                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("获取最终结果");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        System.out.println("调用merge方法");
                        return null;
                    }
                }, new ProcessWindowFunction<String, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        long start = context.window().getStart();
                        long end = context.window().getEnd();

                        String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                        String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");

                        long count = elements.spliterator().estimateSize();

                        out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements);
                    }
                });

        result.print();
        env.execute();
    }
}

到了这里,关于Flink 窗口的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据Flink(七十四):SQL的滑动窗口(HOP)

    文章目录 SQL的滑动窗口(HOP) 滑动窗口定义 :滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动

    2024年02月09日
    浏览(39)
  • 大数据Flink(七十三):SQL的滚动窗口(TUMBLE)

    文章目录 SQL的滚动窗口(TUMBLE) 滚动窗口定义 :滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟

    2024年02月09日
    浏览(35)
  • 大数据Flink(七十七):SQL窗口的Over Windows

    文章目录 SQL窗口的Over Windows 一、​​​​​​​时间区间聚合

    2024年02月09日
    浏览(40)
  • 关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑

    目录 前言         滚动窗口(Tumbling Windows)         allowedLateness 场景描述 数据倾斜问题解决 输出结果偏差问题         思考 输出结果偏差解决 扩展         滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

    2024年02月21日
    浏览(42)
  • 【Flink】Flink窗口触发器

           数据进入到窗口的时候,窗口是否触发后续的计算由窗口触发器决定,每种类型的窗口都有对应的窗口触发机制。WindowAssigner 默认的 Trigger通常可解决大多数的情况。我们通常使用方式如下,调用trigger()方法把我们想执行触发器传递进去:  SingleOutputStreamOperatorProduct

    2024年02月12日
    浏览(37)
  • Flink 学习六 Flink 窗口计算API

    窗口 window 是处理无限流的核心就是把无界的数据流,按照一定的规则划分成一段一段的有界的数据流(桶),然后再这个有界的数据流里面去做计算; 2.1 滚动窗口 相邻窗口之间是没有数据重合 window 大小可以是时间,可以是数据长度 按照数据流是否可以是 keyed , 在分类,nonkey windo

    2024年02月09日
    浏览(43)
  • Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)

                           星光下的赶路人star的个人主页                        将自己生命力展开的人,他的存在,对别人就是愈疗 1、从《星球大战》说起 为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例

    2024年02月07日
    浏览(47)
  • 《Flink学习笔记》——第六章 Flink的时间和窗口

    6.1 时间语义 6.1.1 Flink中的时间语义 对于一台机器而言,时间就是系统时间。但是Flink是一个分布式处理系统,多台机器“各自为政”,没有统一的时钟,各自有各自的系统时间。而对于并行的子任务来说,在不同的节点,系统时间就会有所差异。 我们知道一个集群有JobMana

    2024年02月11日
    浏览(40)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(45)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包