大数据-玩转数据-Flink时间滚动动窗口

这篇具有很好参考价值的文章主要介绍了大数据-玩转数据-Flink时间滚动动窗口。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Flink 窗口 理解

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.

时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。

二、数据准备

准备一个WaterSensor类方便演示

package com.lyh.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
}

三、时间滚动窗口

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.

时间滚动窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink

消费结果:
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink

四、时间滑动窗口

与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。

时间滑动窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink
消费结果
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink

五、时间会话窗口

会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。

时间会话窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
                .process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                    List<WaterSensor> list  = toList(elements);
                        long starttime = ctx.window().getStart();
                        long endtime = ctx.window().getEnd();

                        out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);

                    }
                }).print();
        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
            
        }
        return list;
    }
}

运行结果

在hadoop100 服务器
输入nc -lk 999 启动socket
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink
消费结果
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。

六、基于元素个数的滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.

基于元素个数的滚动窗口代码

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s_n {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .countWindow(2)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list  = toList(elements);
                        out.collect("窗口:" + "key:" + key + "  " + "list:" + list);
                    }
                }).print();

        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);

        }
        return list;
    }
}

运行结果
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink

七、基于元素个数的滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class Window_s_n {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
//                .countWindow(2)
                .countWindow(3,2)
                .process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
                    @Override
                    public void process(String key,
                                        Context ctx,
                                        Iterable<WaterSensor> elements,
                                        Collector<String> out) throws Exception {
                        List<WaterSensor> list  = toList(elements);
                        out.collect("窗口:" + "key:" + key + "  " + "list:" + list);
                    }
                }).print();

        env.execute();
    }

    private static <T>List<T> toList(Iterable<T> it) {
        List<T>  list = new ArrayList<>();
        for (T t : it) {
            list.add(t);

        }
        return list;
    }
}

运行结果
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink
大数据-玩转数据-Flink时间滚动动窗口,大数据-玩转数据-FLINK,大数据,flink文章来源地址https://www.toymoban.com/news/detail-669647.html

到了这里,关于大数据-玩转数据-Flink时间滚动动窗口的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑

    目录 前言         滚动窗口(Tumbling Windows)         allowedLateness 场景描述 数据倾斜问题解决 输出结果偏差问题         思考 输出结果偏差解决 扩展         滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

    2024年02月21日
    浏览(42)
  • flink时间窗口无新的数据进来最后一个窗口不关闭

    测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size。最后一个窗口没有闭窗计算,数据并没及时输出告警 经过调试发现,watermark没有向后继续推进,导致无法闭窗, watermark的时间取的是数据中的业务时间,create_time。 因为没有后续数据进来,

    2024年02月13日
    浏览(43)
  • 《Flink学习笔记》——第六章 Flink的时间和窗口

    6.1 时间语义 6.1.1 Flink中的时间语义 对于一台机器而言,时间就是系统时间。但是Flink是一个分布式处理系统,多台机器“各自为政”,没有统一的时钟,各自有各自的系统时间。而对于并行的子任务来说,在不同的节点,系统时间就会有所差异。 我们知道一个集群有JobMana

    2024年02月11日
    浏览(41)
  • 【Apache-Flink零基础入门】「入门到精通系列」手把手+零基础带你玩转大数据流式处理引擎Flink(特点和优势分析+事件与时间维度分析)

    本文介绍了Apache Flink的定义、架构、基本原理,并辨析了大数据流计算相关的基本概念。同时回顾了大数据处理方式的历史演进以及有状态的流式数据处理的原理。最后,分析了Apache Flink作为业界公认为最好的流计算引擎之一所具备的天然优势,旨在帮助读者更好地理解大数

    2024年02月03日
    浏览(60)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(52)
  • Flink系列Table API和SQL之:滚动窗口、滑动窗口、累计窗口、分组聚合

    有了时间属性,接下来就可以定义窗口进行计算了。窗口可以将无界流切割成大小有限的桶(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理的接口,而在Table API和SQL中,类似的功能也都可以实现。 在Flink 1

    2023年04月27日
    浏览(57)
  • Flink中的时间和窗口

    在传统的批处理系统中,我们可以等到一批数据全部都到齐了之后,对其做相关的计算;但是在实时处理系统中,数据是源源不断的,正常情况下,我们就得来一条处理一条。那么,我们应该如何统计某个实时数据源中最近一段时间内的数据呢? 在Flink的观念中,引入了“窗

    2024年02月08日
    浏览(50)
  • Flink中的时间和窗口操作

    本专栏案例代码和数据集链接: https://download.csdn.net/download/shangjg03/88477960 在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计

    2024年02月08日
    浏览(57)
  • Flink-【时间语义、窗口、水位线】

    🌰:可乐 可乐的生产日期 = 事件时间(可乐产生的时间); 可乐被喝的时间 = 处理时间(可乐被处理【喝掉=处理】的时间)。 机器时间:可能不准确(例如:A可乐厂的时钟比较慢,B可乐厂的时钟比较快,但实际上B产生可乐的时间比A产生可乐的时间慢,却被先处理了)

    2024年02月01日
    浏览(52)
  • 【Apache Flink】基于时间和窗口的算子-配置时间特性

    Apache Flink 它提供了多种类型的时间和窗口概念,使得用户能够进行准确的时间计算。在数据处理任务中,时间的概念是非常重要的,对于一些复杂的实时流处理任务,如事件按时间顺序的聚合、分割和窗口计算,时间更是关键所在。而在这类任务中,选择使用何种时间特性是

    2024年02月08日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包