ProcessWindowFunction 结合自定义触发器的陷阱

这篇具有很好参考价值的文章主要介绍了ProcessWindowFunction 结合自定义触发器的陷阱。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景:

flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢?

ProcessWindowFunction 结合自定义触发器实现统计点击率

关键代码:
ProcessWindowFunction 结合自定义触发器的陷阱,flink,flink,大数据
ProcessWindowFunction 结合自定义触发器的陷阱,flink,flink,大数据
完整代码参见:

package wikiedits.func;


import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import wikiedits.func.model.KeyCount;



public class ProcessWindowFunctionAndTiggerDemo {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/windowtrigger"));

        // 并行度为1
        env.setParallelism(1);
        // 设置数据源,一共三个元素
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                int xxxNum = 0;
                int yyyNum = 0;
                for (int i = 1; i < Integer.MAX_VALUE; i++) {
                    // 只有XXX和YYY两种name
                    String name = (0 == i % 2) ? "XXX" : "YYY";
                    // 更新aaa和bbb元素的总数
                    if (0 == i % 2) {
                        xxxNum++;
                    } else {
                        yyyNum++;
                    }
                    // 使用当前时间作为时间戳
                    long timeStamp = System.currentTimeMillis();
                    // 将数据和时间戳打印出来,用来验证数据
                    if(xxxNum % 2000==0){
                        System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,
                                time(timeStamp), xxxNum, yyyNum));
                    }
                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
                    // 每发射一次就延时1秒
                    Thread.sleep(1);
                }
            }

            @Override
            public void cancel() {}
        });

        // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
                .keyBy(value -> value.f0)
                // 5秒一次的滚动窗口
                .timeWindow(Time.minutes(5))
                // 10s触发一次计算,更新统计结果
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                // 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                    // 自定义状态
                    private ValueState<KeyCount> state;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 初始化状态,name是myState
                        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                    }

                    public void clear(Context context) {
                        ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                        contextWindowValueState.clear();
                    }

                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,
                            Collector<String> collector) throws Exception {
                        // 从backend取得当前单词的myState状态
                        KeyCount current = state.value();
                        // 如果myState还从未没有赋值过,就在此初始化
                        if (current == null) {
                            current = new KeyCount();
                            current.key = s;
                            current.count = 0;
                        }
                        int count = 0;
                        // iterable可以访问该key当前窗口内的所有数据,
                        // 这里简单处理,只统计了元素数量
                        for (Tuple2<String, Integer> tuple2 : iterable) {
                            count++;
                        }
                        // 更新当前key的元素总数
                        current.count += count;
                        // 更新状态到backend
                        state.update(current);

                        ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                        KeyCount windowValue = contextWindowValueState.value();
                        if (windowValue == null) {
                            windowValue = new KeyCount();
                            windowValue.key = s;
                            windowValue.count = 0;
                        }
                        windowValue.count += count;
                        contextWindowValueState.update(windowValue);

                        // 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串
                        String value = String.format("window, %s, %s - %s, %d, windowStateCount :%d,   total : %d",
                                // 当前key
                                s,
                                // 当前窗口的起始时间
                                time(context.window().getStart()),
                                // 当前窗口的结束时间
                                time(context.window().getEnd()),
                                // 当前key在当前窗口内元素总数
                                count,
                                // 当前key所在窗口的总数
                                contextWindowValueState.value().count,
                                // 当前key出现的总数
                                current.count);

                        // 发射到下游算子
                        collector.collect(value);
                    }
                });

        // 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
        mainDataStream.print();

        env.execute("processfunction demo : processwindowfunction");

    }



    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }



}

这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的,不过这其中需要注意点的点是:
每隔10s触发public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector)方法时,iterable对象是包含了一天的窗口内收到的所有消息,也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。
到这里所引起的问题也自然而然的出来了:对于ProcessWindowFunction 实现而言,flink内部是通过ListState的形式保存窗口内收到的所有消息的,注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息,这会导致状态膨胀,想一下,一天内所有的消息都会当成状态保存起来,这对于状态后端的压力是有多大!这些保存在ListState中的消息只有在窗口结束后才会清理:具体参见WindowOperator.clearAllState,那有解决方案吗?使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗?请看下一篇文章

参考文章:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html文章来源地址https://www.toymoban.com/news/detail-704245.html

到了这里,关于ProcessWindowFunction 结合自定义触发器的陷阱的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 数据库触发器简介——修改数据的触发器、删除数据的触发器

    修改数据的触发器 更新数据 思考下面这个触发器会触发几次?几条数据就触发几次。

    2024年02月15日
    浏览(37)
  • 【MySQL触发器】触发器的使用、创建、修改及删除

    一、什么是触发器 二、创建触发器 ①创建一个insert事件触发器 ②创建一个delete 事件触发器  三、触发器包含多条执行语句 四、查看触发器  ①SHOW TRIGGERS语句查看触发器 ②查看系统表triggers实现查看触发器   五、触发器的删除       当我们对一个表进行数据操作时,需

    2023年04月08日
    浏览(40)
  • Verilog设计实现D触发器与JK触发器

    题目:         用Verilog实现以下电路:                 1. 带复位端的正边沿触发的D触发器;                 2.带复位端的正边沿触发的JK触发器。 包括sys_clk,复位信号sys_rst_n,输入信号key_in以及输出信号led_out; 采用行为级描述: testbench仿真代码编写:

    2024年04月28日
    浏览(68)
  • 脉冲触发的触发器

    唯一的不同在于时钟信号的控制不一样 前面的叫做 主触发器, 后面叫做 从触发器 为什么在一个时钟周期内只可能改变一次?(工作原理)  在时钟信号等于0期间,看看时钟信号的工作 CLK=1期间,主FF工作,从FF不工作,主FF形成一个同步SR触发器的功能 随着S,R变化, 但是接

    2024年02月09日
    浏览(36)
  • 电平触发的触发器

    目录 引言 电路分析 分析输入输出关系 时钟信号 同步SR触发器的工作原理 1.时钟信号等于0期间 2.时钟信号等于1期间 总结  电平触发的D触发器(D锁存器) 普通的SR锁存器没有任何抗干扰能力 我们要加控制信号,来抵抗干扰 比如说我们不把信号直接加在门上,我们可以再加

    2023年04月14日
    浏览(39)
  • Unity碰撞检测/触发器触发问题

    在制作2D平板冒险游戏的攻击模块时,遇到攻击敌人后无法产生触发器事件的问题。 在玩家游戏对象下有一攻击子对象。子对象碰撞器默认处于禁用状态,当按下攻击键时,通过代码: 来对碰撞器进行激活,敌人有刚体,且并非Static状态。两个物体均有碰撞体,但此时并未触

    2024年02月11日
    浏览(39)
  • WPF 多值绑定(MultiBinding)与多属性触发器(MultiTrigger)与多数据触发器(MultiDataTrigger)

    当一个控件的某个属性需要绑定到多个值的时候,需要使用MultiBinding. 例子1 一个文本显示Person的Name和Age 例子2 当1,2,3都被选中时,下面的红色框隐藏,不使用后台代码逻辑。 这个时候,使用MultiBinding+MultiValueConverter可实现需求 首先定义多值转换器: XAML如下: 与Trigger属性对

    2024年02月05日
    浏览(32)
  • D触发器仿真

    本篇文章为基础教学,主要探究在Quartus中设计一个D触发器并进行仿真,同时验证时序波形。 参考链接: 简介: D触发器是一个具有记忆功能的,具有两个稳定状态的信息存储器件,是构成多种时序电路的最基本逻辑单元,也是数字逻辑电路中一种重要的单元电路。 因此,

    2024年02月05日
    浏览(51)
  • MySQL 触发器

    触发器是与表有关的数据库对象,指在insert/update/delete之前或之后,触发并执行触发器中定义SQL语句集合。触发器的这种特性可以协助应用在数据库端确保数据的完整性,日志记录,数据校验等操作。 使用别 名OLD 和 NEW 来引用触发器中发生变化的记录内容,这与其他的数据库

    2024年02月08日
    浏览(30)
  • 存储过程触发器

    存储过程: 存储过程是一组预编译的SQL语句,可以在数据库中存储并重复使用。存储过程可以提高性能、减少网络流量并提高安全性。MSSQL中的存储过程使用T-SQL编写。 触发器: 触发器是一种特殊类型的存储过程,它会在数据库中执行某个操作(如INSERT、UPDATE或DELETE)时自动

    2024年02月06日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包