Flink自定义触发器

这篇具有很好参考价值的文章主要介绍了Flink自定义触发器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink自定义触发器

Apache Flink是一个流处理框架,它提供了许多内置的触发器来控制流处理作业的执行。但是,有时候内置的触发器不能满足我们的需求,这时候我们就需要自定义触发器,在编写自定义触发器之前,我们先来了解一下触发器的基本知识:

一、触发器概述

  • 触发器是什么?

    ​ Trigger(触发器)决定了什么时候窗口准备就绪了,一旦窗口准备就绪就可以使用WindowFunction(窗口计算操作)进行计算。每一个 WindowAssigner(窗口分配器) 都会有一个默认的Trigger。如果默认的Trigger不满足用户的需求,用户可以自定义Trigger。

    触发器接口有5个方法,允许触发器对不同的事件做出反应:

    1. onElement():每当向窗口添加一个元素时,都会调用该方法。
    2. onEventTime():当注册的事件时间定时器触发时,该方法被调用。
    3. onProcessingTime():当注册的处理时间计时器触发时,该方法被调用。
    4. onMerge():该方法适用于有状态触发器,并在它们对应的窗口合并时合并两个触发器的状态。
    5. clear():执行窗口的清除操作。

    需要注意的是:

    1. 前3个方法决定如何响应它们的调用事件,返回一个 TriggerResult:

      CONTINUE:什么也不做

      FIRE:触发计算

      PURGE:清除窗口中的元素

      FIRE_AND_PURGE:触发计算并清空窗口中的元素

    2. 以上这些方法都可以用来为之后的操作注册处理时间定时器或事件时间定时器

  • 内置触发器

    1. EventTimeTrigger:基于事件时间和watermark机制来对窗口进行触发计算
    2. ProcessingTimeTrigger: 基于处理时间触发
    3. CountTrigger:窗口元素数超过预先给定的限制值的话会触发计算
    4. PurgingTrigger:作为其它trigger的参数,将其转化为一个purging触发器

二、需求

​ 实际工作中,可能会遇到想控制Flink数据流速度的情况,比如每5秒最多输出3条数据,这时候如果使用默认的TimeWindow或者CountWindow都不好达到要求,这时候就可以进行自定义窗口的触发器Trigger,修改触发窗口执行计算的条件。

三、自定义触发器

为了实现以上需求,我编写了如下代码:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class CountAndSizeTrigger<T> extends Trigger<T, TimeWindow> {

    private static final String DATA_COUNT_STATE_NAME = "dataCountState";

    private static final String DATA_SIZE_STATE_NAME = "dataSizeState";

    // 窗口最大数据条数
    private int maxCount;

    // 窗口最大数据字节数
    private int maxSize;

    // 时间语义:event time、process time
    private TimeCharacteristic timeType;

    // 用于储存窗口当前数据条数的状态对象
    private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(DATA_COUNT_STATE_NAME, new ReduceFunction<Long>() {
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }, LongSerializer.INSTANCE);

    //用于储存窗口当前数据字节数的状态对象
    private ReducingStateDescriptor<Integer> sizeStateDescriptor = new ReducingStateDescriptor(DATA_SIZE_STATE_NAME, new ReduceFunction<Long>() {
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }, IntSerializer.INSTANCE);

    private CountAndSizeTrigger(int maxCount, int maxSize , TimeCharacteristic timeType) {
        this.maxCount = maxCount;
        this.maxSize = maxSize;
        this.timeType = timeType;
    }

    /**
     * 触发计算,计算结束后清空窗口内的元素
     * @param window 窗口
     * @param ctx 上下文
     */
    private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
        clear(window, ctx);
        return TriggerResult.FIRE_AND_PURGE;
    }

    /**
     * 进入窗口的每个元素都会调用该方法
     * @param element 元素
     * @param timestamp 时间戳
     * @param window 窗口
     * @param ctx 上下文
     */
    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
        ReducingState<Integer> sizeState = ctx.getPartitionedState(sizeStateDescriptor);
        Map<String, JSONArray> map = stringToMap(element.toString());
        if (map != null) {
            for (Map.Entry<String, JSONArray> entry : map.entrySet()) {
                JSONArray value = entry.getValue();
                countState.add(Long.valueOf(value.size()));
            }
        } else {
            countState.add(0L);
        }
        int length = String.valueOf(element).getBytes("utf-8").length;
        sizeState.add(length);
        // 注册定时器
        ctx.registerProcessingTimeTimer(window.maxTimestamp());

        if (countState.get() >= maxCount) {
            log.info("fire count {} ",countState.get());
            return fireAndPurge(window, ctx);
        }if (sizeState.get() >= maxSize){
            log.info("fire size {} ",sizeState.get());
            return fireAndPurge(window, ctx);
        }else{
            return TriggerResult.CONTINUE;
        }
    }

    // 数据处理,可根据需要修改
    private Map<String, JSONArray> stringToMap(String str) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        String string = str.substring(1, str.length() - 1).replaceAll(" ", "");
        Map<String, JSONArray> map = new HashMap<>();
        String[] split = string.split("=");
        if (split.length < 2) {
            return null;
        } else {
            String key = split[0];
            String value = string.substring(string.indexOf("=") + 1);
            map.put(key, JSON.parseArray(value));
        }
        return map;
    }

    /**
     * 处理时间窗口触发的时候会被调用
     * @param time 时间
     * @param window 窗口
     * @param ctx 上下文
     */
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        if (timeType != TimeCharacteristic.ProcessingTime) {
            return TriggerResult.CONTINUE;
        }
        log.info("fire time {} ",time);
        return fireAndPurge(window, ctx);
    }

    /**
     * 事件时间窗口触发的时候被调用
     * @param time 时间
     * @param window 窗口
     * @param ctx 上下文
     */
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        if (timeType != TimeCharacteristic.EventTime) {
            return TriggerResult.CONTINUE;
        }
        if (time >= window.getEnd()) {
            return TriggerResult.CONTINUE;
        } else {
            log.info("fire with event tiem: " + time);
            return fireAndPurge(window, ctx);
        }
    }

    /**
     * 执行窗口的清除操作
     * @param window 窗口
     * @param ctx 上下文
     */
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDescriptor).clear();
        ctx.getPartitionedState(sizeStateDescriptor).clear();
    }

    /**
     * 初始化触发器,默认使用processTime
     * @param maxCount 最大数据条数
     * @param maxSize 最大数据字节数
     * @param timeType 事件类型
     */
    public static CountAndSizeTrigger creat(int maxCount, int maxSize) {
        return new CountAndSizeTrigger(maxCount,maxSize,TimeCharacteristic.ProcessingTime);
    }

    /**
     * 初始化触发器
     * @param maxCount 最大数据条数
     * @param maxSize 最大数据字节数
     * @param timeType 事件类型
     */
    public static CountAndSizeTrigger creat(int maxCount, int maxSize, TimeCharacteristic timeType) {
        return new CountAndSizeTrigger(maxCount,maxSize,timeType);
    }
}

四、使用示例

stream
        .timeWindowAll(Time.seconds(10))
        .trigger(new CountAndSizeTrigger(1000, 1024))
        .process(new DemoWindowProcessFunction())
        .addSink(new DemoSinkFunction())
        .name("demo");

以上代码通过调用CountAndSizeTrigger,传入最大数据条数和最大数据字节数,来对数据流进行流速控制。文章来源地址https://www.toymoban.com/news/detail-673539.html

到了这里,关于Flink自定义触发器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink之窗口触发机制及自定义Trigger的使用

    1 窗口触发机制 窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下: Trigger ProcessingTimeoutTrigger EventTimeTrigger CountTrigger DeltaTrigger NeverTrigger in GlobalWindows ContinuousEventTimeTrigger PurgingTrigger ContinuousProcessingTimeTrigger Proces

    2024年04月17日
    浏览(28)
  • 存储过程、触发器、游标、视图、自定义函数 字段类型、字段可空、统计字段、逻辑删除 权限系统数据库&无限极类别设计

    在数据库设计中,存储过程、触发器、游标、视图、自定义函数、字段类型、字段可空、统计字段、逻辑删除以及权限系统和无限级类别设计都是重要的概念。下面我将逐一解释这些概念,并提供相关的设计建议。 存储过程 (Stored Procedure) 定义 :存储过程是一组为了完成特定

    2024年03月09日
    浏览(121)
  • 脉冲触发的触发器(主从触发器)

    脉冲触发的动作特点: (1)触发器的翻转分两步动作。 第一步:当CLK以高电平为有效信号时,在CLK= 1期间主触发器接收输入端(S、R或J、K)的信号,被置成相应的状态,而从触发器不动。 第二步: CLK下降沿到来时从触发器按照主触发器的状态翻转,所以Q、Q’端状态的改变发生在

    2024年02月04日
    浏览(50)
  • 【FGPA】Verilog:JK 触发器 | D 触发器 | T 触发器 | D 触发器的实现

    0x00 JK 触发器 JK 触发器是 RS 触发器和 T 触发器的组合,有两个输入端 J 和 K,如果两个输入端都等于 1,则将当前值反转。 行为表

    2024年02月05日
    浏览(46)
  • 数据库触发器简介——修改数据的触发器、删除数据的触发器

    修改数据的触发器 更新数据 思考下面这个触发器会触发几次?几条数据就触发几次。

    2024年02月15日
    浏览(48)
  • 【MySQL触发器】触发器的使用、创建、修改及删除

    一、什么是触发器 二、创建触发器 ①创建一个insert事件触发器 ②创建一个delete 事件触发器  三、触发器包含多条执行语句 四、查看触发器  ①SHOW TRIGGERS语句查看触发器 ②查看系统表triggers实现查看触发器   五、触发器的删除       当我们对一个表进行数据操作时,需

    2023年04月08日
    浏览(55)
  • Verilog设计实现D触发器与JK触发器

    题目:         用Verilog实现以下电路:                 1. 带复位端的正边沿触发的D触发器;                 2.带复位端的正边沿触发的JK触发器。 包括sys_clk,复位信号sys_rst_n,输入信号key_in以及输出信号led_out; 采用行为级描述: testbench仿真代码编写:

    2024年04月28日
    浏览(77)
  • 脉冲触发的触发器

    唯一的不同在于时钟信号的控制不一样 前面的叫做 主触发器, 后面叫做 从触发器 为什么在一个时钟周期内只可能改变一次?(工作原理)  在时钟信号等于0期间,看看时钟信号的工作 CLK=1期间,主FF工作,从FF不工作,主FF形成一个同步SR触发器的功能 随着S,R变化, 但是接

    2024年02月09日
    浏览(44)
  • 电平触发的触发器

    目录 引言 电路分析 分析输入输出关系 时钟信号 同步SR触发器的工作原理 1.时钟信号等于0期间 2.时钟信号等于1期间 总结  电平触发的D触发器(D锁存器) 普通的SR锁存器没有任何抗干扰能力 我们要加控制信号,来抵抗干扰 比如说我们不把信号直接加在门上,我们可以再加

    2023年04月14日
    浏览(47)
  • Unity碰撞检测/触发器触发问题

    在制作2D平板冒险游戏的攻击模块时,遇到攻击敌人后无法产生触发器事件的问题。 在玩家游戏对象下有一攻击子对象。子对象碰撞器默认处于禁用状态,当按下攻击键时,通过代码: 来对碰撞器进行激活,敌人有刚体,且并非Static状态。两个物体均有碰撞体,但此时并未触

    2024年02月11日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包