Flink之窗口触发机制及自定义Trigger的使用

这篇具有很好参考价值的文章主要介绍了Flink之窗口触发机制及自定义Trigger的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 窗口触发机制

窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下:
  • Trigger
  • ProcessingTimeoutTrigger
  • EventTimeTrigger
  • CountTrigger
  • DeltaTrigger
  • NeverTrigger in GlobalWindows
  • ContinuousEventTimeTrigger
  • PurgingTrigger
  • ContinuousProcessingTimeTrigger
  • ProcessingTimeTrigger
通常情况下是不需要自己重写Trigger的,使用Flink内置的就可以,除非特殊业务特殊需求.
1.1 源码解析

EventTimeTrigger源码说明如何触发窗口计算,在EventTimeTrigger源码中只需要关注onElementonEventTime两个方法即可,源码内容如下:

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    // 基于数据驱动的方法
    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        // 判断当前watermark是否大于等于窗口的最大时间
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            // 如果大于等于窗口的最大时间触发计算
            return TriggerResult.FIRE;
        } else {
            // 小于窗口的最大时间首先注册定时器
            ctx.registerEventTimeTimer(window.maxTimestamp());
            // 然后等待数据继续输入,不触发计算
            return TriggerResult.CONTINUE;
        }
    }

    // 基于事件时间定时器驱动的方法
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        // 根据不断发送来的watermark判断是否触发计算
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }
    // ...
}

源码中将不需要的关注的代码都已省略

  • onElement

    注释中写明这个方法是基于数据进行驱动的,也就是说只有数据到达时才会执行这个方法,每一个窗口都有自己的startTimeendTime,也就是窗口的范围,判断条件中window.maxTimestamp()获取的就是当前窗口的endTime,如果当前watermark超出当前窗口的endTime就会触发这个窗口计算,TriggerResult.FIRE表示的就是窗口开始计算,如果当前watermark小于endTime就不会触发窗口计算这个窗口会继续等待数据输入,也就是TriggerResult.CONTINUE方法.

  • onEventTime

    onElement是由数据驱动的,但是Flink的实际数据处理过程是存在没有数据发送到当前窗口,但是会有watermark源源不断的发送到当前窗口的情况,在多并行度的执行条件下就会发生这种情况.在onEventTime方法中如果上游发送过来的watermark等于当前窗口的endTime就会执行TriggerResult.FIRE否则还是执行TriggerResult.CONTINUE.

Trigger的触发机制就是这样,其他的CountTrigger等大致逻辑基本是一样的,了解清楚源码中这两个方法的作用很容易理解.

1.2 代码实现

通常Flink内置的Trigger都可以满足数据处理需求,往往在实际开发中可能会存在特殊的业务需求,这时用户可以自定义Trigger,以达到控制窗口触发计算的规则. 可以仿照EventTimeTrigger来构建一个自定义Trigger,只需要将其中的部分代码简单进行修改,并在onElement方法中添加自定的触发逻辑即可.
  • 自定义Trigger

    /**
     * 这里首先需要继承Trigger类,并将<Object, TimeWindow>中的Object修改成自己需要的数据类型,这段代码中需要根据UserEvent2中的数据
     * 来控制触发窗口计算的条件,所以将Object修改成UserEvent2
     **/ 
    public class CustomTrigger extends Trigger<UserEvent2, TimeWindow> {
        public CustomTrigger() {}
    
        // 通过修改onElement方法中窗口计算的触发逻辑实现自定义方式
        @Override
        public TriggerResult onElement(
                // 这里也要将原有的Object类型修改成上面的UserEvent2
                UserEvent2 element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            // 原有的判断逻辑不动,这个是为了便捷,判断逻辑可以根据实际需求进行修改,或者如同下面中添加一个新的触发逻辑
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                return TriggerResult.FIRE;
            // 这里增加一个判断逻辑,当用户行为时间为2700的时候也触发计算
            } else if (element.getTime().equals("2700")) {
                return TriggerResult.FIRE;
            // 原有的判断逻辑不动
            } else {
                ctx.registerEventTimeTimer(window.maxTimestamp());
                return TriggerResult.CONTINUE;
            }
        }
    
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ctx.deleteEventTimeTimer(window.maxTimestamp());
        }
    
        @Override
        public boolean canMerge() {
            return true;
        }
    
        @Override
        public void onMerge(TimeWindow window, OnMergeContext ctx) {
            long windowMaxTimestamp = window.maxTimestamp();
            if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
                ctx.registerEventTimeTimer(windowMaxTimestamp);
            }
        }
    
        // 将toString中俄返回值根据用户的需要进行修改
        @Override
        public String toString() {
            return "CustomTrigger()";
        }
    
        // 将返回值更改成创建的自定义Trigger类
        public static CustomTrigger create() {
            return new CustomTrigger();
        }
    }
    
  • 业务代码

    // ...
    SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设置滚动窗口大小为10s
            .trigger(new CustomTrigger()) // 传入自定义的Trigger类
            .allowedLateness(Time.seconds(2)) // 允许迟到数据迟到时间2s,同watermark中的forBoundedOutOfOrderness功能类似
            .sideOutputLateData(lateData) // 将迟到数据进行测流输出
            .max("time");// 获取用户行为发生事件最大的这条数据
    // ...
    

    上面这段业务代码中设置的滚动窗口的大小为10s,正常来说只有满足end - start = 10000的时候才会触发窗口计算,但是在自定义Trigger中指定了当数据中时间为2700的时候也触发窗口计算,在时间为2700的数据没到达时候还会按照原有的逻辑触发窗口计算,但是只要2700的数据到达,不管时候达到TumblingEventTimeWindows.of(Time.seconds(10))这个条件,都会触发窗口计算.文章来源地址https://www.toymoban.com/news/detail-854157.html

到了这里,关于Flink之窗口触发机制及自定义Trigger的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器

                           星光下的赶路人star的个人主页                        内心的平静始于不再让他人掌控你的感情 在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一

    2024年02月08日
    浏览(47)
  • Flink自定义触发器

    Apache Flink是一个流处理框架,它提供了许多内置的触发器来控制流处理作业的执行。但是,有时候内置的触发器不能满足我们的需求,这时候我们就需要自定义触发器,在编写自定义触发器之前,我们先来了解一下触发器的基本知识: 触发器是什么? ​ Trigger(触发器)决定

    2024年02月11日
    浏览(37)
  • Flink之Window窗口机制

    在大多数场景下,需要统计的数据流都是无界的,因此无法等待整个数据流终止后才进行统计。通常情况下,只需要对某个时间范围或者数量范围内的数据进行统计分析 例如: 因此,在Apache Flink中,窗口是对无界数据流进行有界处理的机制。窗口可以将无限的数据流划分为

    2024年02月06日
    浏览(44)
  • WPF 触发器Trigger

    Trigger:当某些条件满足时会触发一个行为。 一、触发器的类型 数据变化触发型:Trigger / DataTrigger 多条件触发型:MultiTrigger / MultiDataTrigger 事件触发型:EventTrigger 二、Trigger Trigger:Property用来指明关注目标控件的哪个属性,Value则是触发条件。 Setter:一旦触发条件被满足,这

    2024年02月11日
    浏览(38)
  • Flink的窗口机制【博学谷学习记录】

    流式计算,一般有两种场景: 无限制的流式计算,比如:wordcount案例,它没有任何外部的限制条件,这种情况不多。 有限制的流式计算,比如:统计早高峰时间内经过某个道路的车辆数。 对于第二种情况来说,我们需要加上额外的限制条件。最常用的限制条件就是 时间 了

    2024年02月03日
    浏览(35)
  • Unity中触发器(trigger)个人见解

    在 Unity 3D 中,检测碰撞发生的方式有两种,一种是利用碰撞体,另一种则是利用触发器(Trigger)。 触发器(Trigger)是用来触发事件的 例如:在角色扮演游戏里,玩家走到一个地方会发生出现 Boss 的事件,就可以用触发器来实现。或者构建传送门时,需要触发器完成传送。 触

    2024年02月03日
    浏览(49)
  • mysql 、sql server trigger 触发器

    sql server mySQL NEW与OLD详解 MySQL 中定义了 NEW 和 OLD,用来表示触发器的所在表中,触发了触发器的那一行数据,来引用触发器中发生变化的记录内容,具体地: 在INSERT型触发器中,NEW用来表示将要(BEFORE)或已经(AFTER)插入的新数据; 在UPDATE型触发器中,OLD用来表示将要或已

    2024年02月12日
    浏览(45)
  • MySQL触发器Trigger加载以及目前局限

    GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。 GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。 作者: 亮 文章来源:GreatSQL社区原创 首先需要知道MySQL中触发器特点,以及表table相关触发器加载方式 MySQL中单个trigger仅支持单事件触发即单个触发

    2024年02月05日
    浏览(44)
  • 【Unity】Trigger触发器失效没反应的解决办法

    今天遇到了一个问题,创建的角色进入传送门的时候无法传送。 看了所有的代码,确定没有问题之后,把目标瞄准到了Trigger上,在触发中添加了输出之后发现,触发器确实没有触发 众所周知,触发器触发需要两者都有触发器,其中一者勾选Is Trigger,其中一者有刚体就可以了

    2024年02月13日
    浏览(57)
  • 【Docker】网络配置及自定义网络的使用

            Docker的网络配置主要是指Docker容器与外部网络之间的连接设置,包括容器内部的IP地址、端口号等。Docker提供了多种网络模式,包括bridge、host、none等,以满足不同的需求。         默认情况下,Docker使用bridge模式,即创建一个虚拟网桥,将容器连接到该网桥上

    2024年01月20日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包