flink 最后一个窗口一直没有新数据,窗口不关闭问题

这篇具有很好参考价值的文章主要介绍了flink 最后一个窗口一直没有新数据,窗口不关闭问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

自定义实现 WatermarkStrategy接口

窗口类型:滚动窗口
代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{

        private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);

        @Override
        public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<JSONObject>() {
                private long maxWatermark;

                @Override
                public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
                    maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
                    state.f0 = System.currentTimeMillis();
                    System.out.println("maxWatermark is " + maxWatermark);
                    state.f1 = false;
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    //乱序时间
                    long outOfTime = 3000L;
                    if (maxWatermark - outOfTime <=0){
                    } else {
                        // 10s内没有数据则关闭当前窗口
                        System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
                        System.out.println("state.f1:" + state.f1);
                        if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));
                            state.f1 = true;
                            System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));
                        } else {
                            System.out.println("正常发送水印");
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
                        }
                    }
                }
            };
        }
    }

代码部分逻辑说明
flink 最后一个窗口一直没有新数据,窗口不关闭问题,flink,大数据若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
flink 最后一个窗口一直没有新数据,窗口不关闭问题,flink,大数据
watermark 周期生成()的疑问:
1、默认200ms,会连续生成4次后,不会继续生成了
2、设置了周期生成间隔,env.getConfig().setAutoWatermarkInterval(1000L); 只会周期生成一次

参考:https://blog.csdn.net/lr131425/article/details/127422833文章来源地址https://www.toymoban.com/news/detail-801089.html

到了这里,关于flink 最后一个窗口一直没有新数据,窗口不关闭问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • WPF 已知问题 RadioButton 指定 GroupName 后关闭窗口可能导致无法选中

    本文记录一个 WPF 已知问题,当 WPF 的 RadioButton 指定 GroupName 且将 IsChecked 状态绑定到 ViewModel 上,将包含以上控件的代码的窗口显示两个,接着关闭其中一个。此时可以看到依然开着的窗口的 RadioButton 控件无法正确在用户界面上点击选中 此问题已经报告给 WPF 官方,请看 ht

    2024年02月08日
    浏览(40)
  • 模拟器单窗口ip有问题?试试关闭IPV6来解决

    目前应该不止雷电9有这个问题了,最早是看到无忧群里在说有这个问题,后面发现很多其他的ip软件也有同样的问题,很多人都遇到,所以做个图文教程在这里,没出问题的也可以设置一下,目前ipv6也还没普及,可以不用。 win10和win11都一样的方法,不同日期的系统,设置可

    2024年01月23日
    浏览(55)
  • 增加并行度后,发现Flink窗口不会计算的问题。

    窗口没有关闭计算的问题,一直困扰了很久,经过多次验证,确定了问题的根源。 Flink使用了window,同时使用了watermark ,并且还设置了较高的并行度。生产是设置了300的并行度,并且接入了 几十个topic ,这个地方划重点,后面会提到。结果就是,窗口没有关闭进行计算。于

    2024年02月06日
    浏览(45)
  • 解决python+selenium自动化,打开谷哥浏览器窗口么会自动关闭问题

    # 导包 from selenium import webdriver from selenium.webdriver.common.by import By # 实例化浏览器,且浏览器对象的初始化放在定义的方法函数外,以全局变量的形式使用 wd = webdriver.Chrome() # 已配置环境变量 def get(): #

    2024年02月14日
    浏览(56)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(43)
  • Flink流数据窗口与时间

    随着大数据时代的到来,流处理技术变得越来越重要。流处理系统可以实时地处理大量数据,为实时应用提供有价值的信息。Apache Flink是一个流处理框架,它可以处理大规模的流数据,并提供丰富的功能,如窗口操作、时间操作等。在本文中,我们将深入探讨Flink流数据窗口

    2024年02月20日
    浏览(94)
  • flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

    1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行

    2024年02月13日
    浏览(44)
  • 微信蓝牙小程序,连接蓝牙模块收发数据。遇到的问题:我连接成功之后发现服务值下面两个特征值一个只能读一个只能写,然后点击读的那个一直收到同一个字,点击发送无论发什么内容只能收到同一个中文字,求指导

    大家好,我是小程序初学者,目前尝试做一个蓝牙小程序连接蓝牙模块进行收发数据,遇到了问题求大佬指点 这是代码运行结果:点击能读的特征值就会接收到耀这个字,点击输入框无论发送什么都会接收到肀这个字 遇到的问题:我连接成功之后发现服务值下面两个特征值

    2024年01月16日
    浏览(63)
  • 关于position:fixed定位的位置不对的问题(即没有按照浏览器的窗口进行定位)

    问题: 今天在开发过程中发现元素使用 position: fixed 时位置有问题,位置跟我写的位置对不上,后面在 MDN 上面找到了答案,下面是关于 position: fixed 的描述: fixed: 元素会被移出正常文档流,并不为元素预留空间,而是通过指定元素相对于屏幕视口(viewport)的位置来指定元

    2024年02月15日
    浏览(44)
  • 大数据-玩转数据-Flink时间滚动动窗口

    在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集

    2024年02月11日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包