1 窗口触发机制
- Trigger
- ProcessingTimeoutTrigger
- EventTimeTrigger
- CountTrigger
- DeltaTrigger
- NeverTrigger in GlobalWindows
- ContinuousEventTimeTrigger
- PurgingTrigger
- ContinuousProcessingTimeTrigger
- ProcessingTimeTrigger
1.1 源码解析
以EventTimeTrigger
源码说明如何触发窗口计算,在EventTimeTrigger
源码中只需要关注onElement
和onEventTime
两个方法即可,源码内容如下:
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
// 基于数据驱动的方法
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
// 判断当前watermark是否大于等于窗口的最大时间
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
// 如果大于等于窗口的最大时间触发计算
return TriggerResult.FIRE;
} else {
// 小于窗口的最大时间首先注册定时器
ctx.registerEventTimeTimer(window.maxTimestamp());
// 然后等待数据继续输入,不触发计算
return TriggerResult.CONTINUE;
}
}
// 基于事件时间定时器驱动的方法
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// 根据不断发送来的watermark判断是否触发计算
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
// ...
}
源码中将不需要的关注的代码都已省略
-
onElement
注释中写明这个方法是基于数据进行驱动的,也就是说只有数据到达时才会执行这个方法,每一个窗口都有自己的
startTime
和endTime
,也就是窗口的范围,判断条件中window.maxTimestamp()
获取的就是当前窗口的endTime
,如果当前watermark
超出当前窗口的endTime
就会触发这个窗口计算,TriggerResult.FIRE
表示的就是窗口开始计算,如果当前watermark
小于endTime
就不会触发窗口计算这个窗口会继续等待数据输入,也就是TriggerResult.CONTINUE
方法. -
onEventTime
onElement
是由数据驱动的,但是Flink的实际数据处理过程是存在没有数据发送到当前窗口,但是会有watermark
源源不断的发送到当前窗口的情况,在多并行度的执行条件下就会发生这种情况.在onEventTime
方法中如果上游发送过来的watermark
等于当前窗口的endTime
就会执行TriggerResult.FIRE
否则还是执行TriggerResult.CONTINUE
.
Trigger
的触发机制就是这样,其他的CountTrigger
等大致逻辑基本是一样的,了解清楚源码中这两个方法的作用很容易理解.
1.2 代码实现
-
自定义Trigger
/** * 这里首先需要继承Trigger类,并将<Object, TimeWindow>中的Object修改成自己需要的数据类型,这段代码中需要根据UserEvent2中的数据 * 来控制触发窗口计算的条件,所以将Object修改成UserEvent2 **/ public class CustomTrigger extends Trigger<UserEvent2, TimeWindow> { public CustomTrigger() {} // 通过修改onElement方法中窗口计算的触发逻辑实现自定义方式 @Override public TriggerResult onElement( // 这里也要将原有的Object类型修改成上面的UserEvent2 UserEvent2 element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { // 原有的判断逻辑不动,这个是为了便捷,判断逻辑可以根据实际需求进行修改,或者如同下面中添加一个新的触发逻辑 if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; // 这里增加一个判断逻辑,当用户行为时间为2700的时候也触发计算 } else if (element.getTime().equals("2700")) { return TriggerResult.FIRE; // 原有的判断逻辑不动 } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } // 将toString中俄返回值根据用户的需要进行修改 @Override public String toString() { return "CustomTrigger()"; } // 将返回值更改成创建的自定义Trigger类 public static CustomTrigger create() { return new CustomTrigger(); } }
-
业务代码文章来源:https://www.toymoban.com/news/detail-854157.html
// ... SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设置滚动窗口大小为10s .trigger(new CustomTrigger()) // 传入自定义的Trigger类 .allowedLateness(Time.seconds(2)) // 允许迟到数据迟到时间2s,同watermark中的forBoundedOutOfOrderness功能类似 .sideOutputLateData(lateData) // 将迟到数据进行测流输出 .max("time");// 获取用户行为发生事件最大的这条数据 // ...
上面这段业务代码中设置的滚动窗口的大小为
10s
,正常来说只有满足end - start = 10000
的时候才会触发窗口计算,但是在自定义Trigger
中指定了当数据中时间为2700
的时候也触发窗口计算,在时间为2700
的数据没到达时候还会按照原有的逻辑触发窗口计算,但是只要2700
的数据到达,不管时候达到TumblingEventTimeWindows.of(Time.seconds(10))
这个条件,都会触发窗口计算.文章来源地址https://www.toymoban.com/news/detail-854157.html
到了这里,关于Flink之窗口触发机制及自定义Trigger的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!