MaxOutOfOrderness
source.map(...//省略不必要代码)
// 定义 watermark
.assignTimestampsAndWatermarks(
// 设置 watermark 比 事件时间晚 1s
WatermarkStrategy.<ApacheLogEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
// 定义 watermark 生成规则
.withTimestampAssigner(...//省略不必要代码));
定义 watermark 的时候可以设置生成 watermark 的时间比事件时间延迟多久,即 eventTime + maxoutoforderness
为什么说设置了 maxoutoforderness就可以减轻乱序?因为我们统计数据在哪个窗口,是按照 Event time 收入窗口的,而不是按照eventTime + maxoutoforderness或者eventTime - maxoutoforderness收入窗口的,所以如果你设置watermark 比 事件时间晚 2s,比如窗口本来是【0,5),数据A的event_time为4,那么哪怕这条数据慢了2s过来(即到了event_time=6s才到),6s的另一条准时数据B的event_time=6,他会把当前watermark推进到event_time-2s=4s,没有大于end_of_window=5,因此还不会触发窗口计算;所以B这条迟到数据在event_time=6s时才过来,就还能参与到后面的窗口计算;
而如果你设置的 maxoutoforderness=0s,那么这条数据慢了2s过来(即到了event_time=6s才到),5s的另一条准时数据C会把watermark推进到也是event_time-0s=5s>=end_of_window,马上就会触发窗口计算,此时如果没有配置allowlateness,则6s时才来的迟到数据A是无法参与对应的窗口计算的。
Allowedlateness
dataStream.keyBy(...//省略不重要的代码)
.timeWindow(Time.minutes(10), Time.seconds(5))
// 定义窗口关闭的延迟时间
.allowedLateness(Time.minutes(1))
默认情况下, 如果不指定 AllowedLateness, 其值是 0, 即当 Watermark 通过 end-of-window 之后, 再有归属于该window的数据到达时, 这些数据会被删除.
为了避免有些迟到的数据被删除, 因此产生了 AllowedLateness 的概念.,使用allowedLateness延迟销毁窗口,使得Watermark 超过 end-of-window 之后,允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。
窗口watermark和allowedLateness之后依然迟到的流数据,也是通过.sideOutputLateData(outputTag)和result.getSideOutput(outputTag)的侧输出流方式输出的,拿到这一部分数据后用户可以自己处理,相比于spark的水印和数据延迟机制来说,flink的更加完善和易用
allowedLateness只针对eventTime,因为processingTime不存在延时的情况。。
区别说明
假设我们有一个数据流,其中包含不同时间的事件,我们想要计算每个用户过去一小时内的点击次数。我们使用事件时间来处理这些数据。
MaxOutOfOrderness
假设当前时间是13:00,我们正在处理一个时间范围为12:00至13:00的时间窗口。如果我们设置`maxOutOfOrderness`为5分钟,那么系统会等待直到13:05,以确保所有在12:00至13:00时间范围内实际发生但延迟到达的事件都能被包括在这个窗口的计算中。 例如,我们有一个用户在12:50点击了页面,但是由于某种原因,这个点击事件直到13:03才到达Flink系统。由于我们设置了5分钟的`maxOutOfOrderness`,这个事件仍然会被包括在12:00至13:00时间窗口的计算中。
这里注意:我们一般只会让水位线比事件时间慢,而事件时间是不会超越当前时间的(除非异常数据,所以13:05的水位线一定小于等于13:05),所以配置maxOutOfOrderness才能让水位线上升慢一点来等乱序落后的数据,不至于按正常的事件时间来准时触发窗口计算。
AllowedLateness
继续上面的例子,假设我们设置了`allowedLateness`为2分钟。在13:05之后,系统认为12:00至13:00时间窗口的所有事件都已经到达,并触发计算。假设此时计算结果显示用户A在该窗口内点击了5次。 但是,由于某些原因,用户A在12:58的另一个点击事件直到13:07才到达。由于我们设置了2分钟的`allowedLateness`,系统会重新触发12:00至13:00时间窗口的计算,并将这个迟到的事件包括在内。新的计算结果会显示用户A在该窗口内点击了6次,并且这个新的结果会被发射出去。
总结一下,`maxOutOfOrderness`和`allowedLateness`都是为了处理乱序事件,但是它们在不同的阶段起作用。`maxOutOfOrderness`是在窗口触发之前等待迟到事件的机制,而`allowedLateness`是在窗口触发之后保持窗口开放一段时间以处理迟到事件的机制。通过合理设置这两个参数,我们可以更准确地处理乱序事件。
具体例子
可以看这个例子,讲得很清楚:文章来源:https://www.toymoban.com/news/detail-855009.html
flink-learning/AllowedLateness.md at main · agoclover/flink-learning · GitHub
文章来源地址https://www.toymoban.com/news/detail-855009.html
到了这里,关于flink的MaxOutOfOrderness 和 Allowedlateness 区别的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!