学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》
学习笔记如下:
窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。
Keyed Windows
在 Keyed Windows 上使用窗口时,要调用 keyBy(...)
而后再调用 window(...)
:
stream
.keyBy(...) // 仅 keyed 窗口需要
.window(...) // 必填项:"assigner"
[.trigger(...)] // 可选项:"trigger" (省略则使用默认 trigger)
[.evictor(...)] // 可选项:"evictor" (省略则不使用 evictor)
[.allowedLateness(...)] // 可选项:"lateness" (省略则为 0)
[.sideOutputLateData(...)] // 可选项:"output tag" (省略则不对迟到数据使用 side output)
.reduce/aggregate/apply() // 必填项:"function"
[.getSideOutput(...)] // 可选项:"output tag"
在 non-keyed windows 上使用窗口时,直接调用 windowAll(...)
。
stream
.windowAll(...) // 必填项:"assigner"
[.trigger(...)] // 可选项:"trigger" (else default trigger)
[.evictor(...)] // 可选项:"evictor" (else no evictor)
[.allowedLateness(...)] // 可选项:"lateness" (else zero)
[.sideOutputLateData(...)] // 可选项:"output tag" (else no side output for late data)
.reduce/aggregate/apply() // 必填项:"function"
[.getSideOutput(...)] // 可选项:"output tag"
窗口的生命周期
窗口会在第一个属于它的元素到达时被创建,然后在时间超过窗口的 “结束时间戳 + 允许的延迟时间” 时被完全删除。Flink 仅保证删除基于时间的窗口。
例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么当第一个元素落入 12:00
至 12:05
这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06
时,这个窗口将被摧毁。
- window trigger:定义何时窗口中的数据可以被 function 计算;在窗口被创建后、删除前的这段时间内定义如何清理窗口中的数据。
- window function:定义如何计算窗口中的内容。
- window evictor:在 trigger 触发之后,在窗口函数的前后删除数据。
Keyed 和 Non-Keyed Windows
首先必须要在定义窗口前确定数据流是 keyed 还是 non-keyed。
使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理,属于同一个 key 的元素会被发送到同一个 task。
对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream,所以所有的窗口计算会被同一个 task 完成,也就是并行度为 1。
Window Assigners
指定了你的 stream 是否为 keyed 之后,下一步就是定义 window assigner。
Window assigner 定义了 stream 中的元素如何被分发到各个窗口。 可以在 window(...)
或 windowAll(...)
中指定一个 WindowAssigner
。
基于时间的窗口用 start timestamp(包含)和 end timestamp(不包含)描述窗口的大小。 在代码中,Flink 处理基于时间的窗口使用的是 TimeWindow
, 它有查询开始和结束 timestamp 以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()
。
滚动窗口(Tumbling Windows)
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。
例如:指定滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
示例:使用滚动窗口的样例
// 滚动 event-time 窗口 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>);
// 滚动 processing-time 窗口 input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>);
// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
滚动窗口有两个参数:
- 第一个是窗口长度(即两个窗口之间的时间间隔),其中的时间间隔可以用
Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
等来指定。 - 第二个是窗口的偏移量(offset)。在不设置偏移量时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如
1:00:00.000 - 1:59:59.999
、2:00:00.000 - 2:59:59.999
等。而如果设置了 15 分钟的偏移量,则会得到1:15:00.000 - 2:14:59.999
、2:15:00.000 - 3:14:59.999
等。
滑动窗口(Sliding Windows)
滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 但是,滑动窗口还可以指定滑动距离(window slide)参数来控制生成新窗口的频率。 如果滑动距离(slide)小于窗口大小,则滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
例如:滑动窗口指定大小为 10 分钟、滑动距离 5 分钟的窗口,则每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
示例:使用滑动窗口
// 滑动 event-time 窗口 input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>);
// 滑动 processing-time 窗口 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>);
// 滑动 processing-time 窗口,偏移量为 -8 小时 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
滑动窗口有 3 个参数:
- 第 1 个是窗口长度,指定时间间隔的方法与滚动窗口一致
- 第 2 个是滑动距离
- 第 3 个是偏移量(offset),设置方法与滚动窗口一致
会话窗口(Session Windows)
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据(即在一段不活跃的间隔)之后会关闭。
会话窗口的 assigner 可以设置固定的会话间隔(session gap)或用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
示例:使用会话窗口
// 设置了固定间隔的 event-time 会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>);
// 设置了动态间隔的 event-time 会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // 决定并返回会话间隔 })) .<windowed transformation>(<window function>);
// 设置了固定间隔的 processing-time session 窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>);
// 设置了动态间隔的 processing-time 会话窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // 决定并返回会话间隔 })) .<windowed transformation>(<window function>);
会话窗口有 2 种参数:
- 固定间隔:指定时间间隔的方法与滚动窗口一致
- 动态间隔:通过实现
SessionWindowTimeGapExtractor
接口来指定。
会话窗口并没有固定的开始或结束时间,在 Flink 内部,会话窗口的算子会为每一条数据创建一个窗口,然后将距离不超过预设间隔的窗口合并。
全局窗口(Global Windows)
全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在指定了自定义的 trigger 时有效,否则计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。
示例:使用全局窗口
DataStream<T> input = ...; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
窗口函数(Window Functions)
定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了。关于窗口如何触发,详见 triggers。
窗口函数有三种:ReduceFunction
、AggregateFunction
或 ProcessWindowFunction
。 前两者执行起来更高效(详见 State Size)因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。 而 ProcessWindowFunction
会得到能够遍历当前窗口内所有数据的 Iterable
,以及关于这个窗口的 meta-information。
使用 ProcessWindowFunction
的窗口转换操作没有其他两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据。 ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
合并来提高效率。 这样做既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction
接收窗口的 metadata。 我们接下来看看每种函数的例子。
ReduceFunction
ReduceFunction
指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。Flink 使用 ReduceFunction
对窗口中的数据进行增量聚合。
示例:使用
ReduceFunction
对元组的第二个属性求和DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>>() { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });
AggregateFunction
AggregateFunction
接收三个类型:输入数据的类型(IN
)、累加器的类型(ACC
)和输出数据的类型(OUT
),其中输入数据的类型是输入流的元素类型。
AggregateFunction
接口有如下方法:
-
createAccumulator()
:创建初始累加器 -
add
:将一条元素累加进累加器 -
merge
:合并两个累加器 -
getResult
:从累加器中提取输出(OUT
类型)
可以看到,ReduceFunction
是 AggregateFunction
的特殊情况。与 ReduceFunction
相同,Flink 会在输入数据到达窗口时直接进行增量聚合。
示例:使用
AggregateFunction
计算窗口中所有元素的元组的第二个属性的平均值/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());
ProcessWindowFunction
ProcessWindowFunction
可以获取包含窗口内所有元素的 Iterable,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。但是,ProcessWindowFunction
的灵活性是以性能和资源消耗为代价的,因为窗口中的数据无法被增量聚合,所以需要在窗口触发前缓存所有数据。
ProcessWindowFunction
抽象类的源码如下:
// flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* Deletes any state in the {@code Context} when the Window expires (the watermark passes its
* {@code maxTimestamp} + {@code allowedLateness}).
*
* @param context The context to which the window is being evaluated
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public void clear(Context context) throws Exception {}
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
key
参数由 keyBy()
中指定的 KeySelector
选出。
示例:使用
ProcessWindowFunction
对窗口中的元素技术,并且将窗口本身的信息一同输出DataStream<Tuple2<String, Long>> input = ...; input .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new MyProcessWindowFunction()); /* ... */ public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) { long count = 0; for (Tuple2<String, Long> in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); } }
增量聚合的 ProcessWindowFunction
通过将 ProcessWindowFunction
与 ReduceFunction
或 AggregateFunction
搭配使用,可以将数据在到达窗口的时候进行增量聚合,当窗口关闭时,ProcessWindowFunction
将会得到聚合的结果。这样就可实现增量聚合窗口,同时从 ProcessWindowFunction
中获得窗口的结果数据。
示例:使用 ReduceFunction 增量聚合。将
ReduceFunction
与ProcessWindowFunction
组合,返回窗口中的最小元素和窗口的开始时间。DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions private static class MyReduceFunction implements ReduceFunction<SensorReading> { public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { public void process(String key, Context context, Iterable<SensorReading> minReadings, Collector<Tuple2<Long, SensorReading>> out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min)); } }
示例:使用 AggregateFunction 增量聚合。将
AggregateFunction
与ProcessWindowFunction
组合,计算平均值并与窗口对应的 key 一同输出。DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions /** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { public void process(String key, Context context, Iterable<Double> averages, Collector<Tuple2<String, Double>> out) { Double average = averages.iterator().next(); out.collect(new Tuple2<>(key, average)); } }
在 ProcessWindowFunction 中使用 per-window state
除了访问 keyed state,ProcessWindowFunction
还可以使用作用域仅为 “当前正在处理的窗口” 的 keyed state。不同的窗口、不同的 key 都会有自己不同的 per-window state。
在 process()
接收到的 Context
对象中,有两个方法允许我们访问以下两种 state:
-
globalState()
,访问全局的 keyed state -
windowState()
,访问作用域仅限于当前窗口的 keyed state
这尤其适用于一个 window 被触发多次的情况(例如出现延迟数据再次触发窗口计算,或自定义了提前触发窗口的 trigger)。
当使用窗口状态时,需要注意在删除窗口时清除这些状态,具体地,它们应该定义在 clear()
方法中。
WindowFunction(已过时)
在某些可以使用 ProcessWindowFunction
的地方,你也可以使用 WindowFunction
。 它是旧版的 ProcessWindowFunction
,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。 这个接口会在未来被弃用。
WindowFunction
的源码如下:
// flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
示例:使用
WindowFunction
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction());
Triggers
Trigger
决定了一个窗口何时可以被 window function 处理。 每个 WindowAssigner
都有一个默认的 Trigger
。 如果默认 trigger 无法满足你的需要,则可以在 trigger(...)
调用中指定自定义的 trigger。
Trigger 接口提供了五个方法来响应不同的事件:
-
onElement()
:在每个元素被加入窗口时调用 -
onEventTime()
:在注册的 event-time timer 触发时调用 -
onProcessingTime()
:在注册的 processing-time timer 触发时调用 -
onMerge()
:与有状态的 trigger 相关。该方法会在两个窗口合并时,将窗口对应 trigger 的状态进行合并(例如使用会话窗口时)。 -
clear()
:在对应窗口被移除时调用。
前三个方法通过返回 TriggerResult
来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:
-
CONTINUE
:什么也不做 -
FIRE
:触发计算 -
PURGE
:清空窗口内的元素 -
FIRE_AND_PURGE
:触发计算,计算结束后清空窗口内的元素
触发(Fire)与清除(Purge)
当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE
或 FIRE_AND_PURGE
。 这是让窗口算子发送当前窗口计算结果的信号。如果一个窗口指定了 ProcessWindowFunction
,所有的元素都会传给 ProcessWindowFunction
。 如果是 ReduceFunction
或 AggregateFunction
,则直接发送聚合的结果。
当 trigger 触发时,它可以返回 FIRE
或 FIRE_AND_PURGE
。 FIRE
会保留被触发的窗口中的内容,而 FIRE_AND_PURGE
会删除这些内容。 Flink 内置的 trigger 默认使用 FIRE
,不会清除窗口的状态。
WindowAssigner 默认的 Triggers
WindowAssigner
默认的 Trigger
足以应付诸多情况。 例如,所有的 event-time window assigner 都默认使用 EventTimeTrigger
。 这个 trigger 会在 watermark 越过窗口结束时间后直接触发。
GlobalWindow
的默认 trigger 是永远不会触发的 NeverTrigger
。因此,使用 GlobalWindow
时,必须自己定义一个 trigger。
内置 Triggers 和自定义 Triggers
Flink 包含一些内置 trigger:
-
EventTimeTrigger
:根据 watermark 测量的 event time 触发 -
ProcessingTimeTrigger
:根据 processing time 触发 -
CountTrigger
:在窗口中的元素超过预设的限制时触发 -
PurgingTrigger
:接收另一个 trigger 并将它转换成一个会清理数据的 trigger
如果你需要实现自定义的 trigger,可以考虑继承抽象类 Trigger。
Trigger 抽象类的源码位置:flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
Evictors
Flink 的窗口模型允许在 WindowAssigner
和 Trigger
之外指定可选的 Evictor
。 如本文开篇的代码中所示,通过 evictor(...)
方法传入 Evictor
。 Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。 Evictor
接口提供了两个方法实现此功能:
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evictBefore()
包含在调用窗口函数前的逻辑,而 evictAfter()
包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。
Flink 有三个内置的 evictor:
-
CountEvictor
:仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除 -
DeltaEvictor
:接收DeltaFunction
和threshold
参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于threshold
的元素。 -
TimeEvictor
:接收interval
参数,以毫秒表示。 它会找到窗口中元素的最大 timestampmax_ts
并移除比max_ts - interval
小的所有元素。
Flink 对窗口中元素的顺序不做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。
Allowed Lateness
在默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。但是 Flink 允许指定窗口算子最大的 allowed lateness。 Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素,依旧会被加入窗口。一个迟到但没有被丢弃的元素是否会再次触发窗口,取决于窗口的 trigger,比如 EventTimeTrigger
。
为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness 超时才会将窗口及其状态删除。
示例:指定 allowed lateness
DataStream<T> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>);
当使用 GlobalWindows
时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是 Long.MAX_VALUE
。
从旁路输出(side output)获取迟到数据
可以通过 Flink 的旁路输出功能,获得迟到数据的数据流。
示例:获取延迟数据并添加到
lateStream
数据流final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
迟到数据的一些考虑
当指定了大于 0 的 allowed lateness 时,窗口本身以及其中的内容仍会在 watermark 越过窗口末端后保留。这时,如果一个迟到但未被丢弃的数据到达,它可能会再次触发这个窗口。这种触发被称作 late firing
,与表示第一次触发窗口的 main firing
相区别。
如果是使用会话窗口的情况,late firing 可能会进一步合并已有的窗口,因为他们可能会连接现有的、未被合并的窗口。
需要注意的是,late firing 发出的元素应该被视作对之前计算结果的更新,即你的数据流中会包含一个相同计算任务的 多个结果。你的应用需要考虑到这些重复的结果,或去除重复的部分。
Working with window results
窗口操作的结果会变回 DataStream
,并且窗口操作的信息不会保存在输出的元素中。所以如果想要保留窗口的 meta-information,则需要在 ProcessWindowFunction
里手动将他们放入输出的元素中。输出元素中保留的唯一相关的信息是元素的 timestamp。 它被设置为窗口能允许的最大 timestamp,也就是 end timestamp - 1。也就是说,在窗口操作之后,元素总是会携带一个 event-time 或 processing-time timestamp。 对 Processing-time 窗口来说,这并不意味着什么。 而对于 event-time 窗口来说,“输出携带 timestamp” 以及 “watermark 与窗口的相互作用” 这两者使建立窗口大小相同的连续窗口操作(consecutive windowed operations) 变为可能。我们先看看 watermark 与窗口的相互作用,然后再来讨论它。
watermarks 与窗口的交互
当 watermark 到达窗口算子时,它触发了两件事:
- 这个 watermark 触发了所有最大 timestamp(即 end-timestamp - 1)小于它的窗口
- 这个 watermark 被原封不动地转发给下游的任务。
通俗来讲,watermark 将当前算子中那些 “一旦这个 watermark 被下游任务接收就肯定会就超时” 的窗口全部冲走。
连续窗口操作
通过窗口结果元素的 timestamp 以及窗口与 watermark 的交互,使串联多个窗口操作成为可能。只要指定相同的窗口时长和偏移量,就可以实现有两个连续的窗口,它们既能使用不同的 key,又能让上游操作中某个窗口的数据出现在下游操作的相同窗口。
示例:连续窗口操作。其中第一个时间窗口中在 [0, 5) 中的结果会出现在下一个窗口的 [0, 5) 窗口中。文章来源:https://www.toymoban.com/news/detail-802199.html
DataStream<Integer> input = ...; DataStream<Integer> resultsPerKey = input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer()); DataStream<Integer> globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction());
关于状态大小的考量
窗口可以被定义在很长的时间段上(比如几天、几周或几个月)并且积累下很大的状态。当你估算窗口计算的储存需求时,可以使用几条规则:文章来源地址https://www.toymoban.com/news/detail-802199.html
- Flink 会为一个元素在它所属的每一个窗口中都创建一个副本。 因此,一个元素在滚动窗口的设置中只会存在一个副本(一个元素仅属于一个窗口,除非它迟到了),而在在滑动窗口中可能会被多次拷贝。
-
ReduceFunction
和AggregateFunction
可以极大地减少储存需求,因为他们会就地聚合到达的元素,且每个窗口仅储存一个值;而使用ProcessWindowFunction
需要累积窗口中所有的元素。 - 使用
Evictor
可以避免预聚合,因为窗口中的所有数据必须先经过 evictor 才能进行计算。
到了这里,关于Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!