flink时间窗口无新的数据进来最后一个窗口不关闭

这篇具有很好参考价值的文章主要介绍了flink时间窗口无新的数据进来最后一个窗口不关闭。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size。最后一个窗口没有闭窗计算,数据并没及时输出告警

经过调试发现,watermark没有向后继续推进,导致无法闭窗, watermark的时间取的是数据中的业务时间,create_time。

因为没有后续数据进来, 所以watermark一直停在收到的最后一条数据的时间,,

按照官网的watermark的实现:

inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new EventWaterMark(window))
                .withTimestampAssigner((SerializableTimestampAssigner<JSONObject>) (jsonObject, l) -> jsonObject.getLong(window.getAttrField()))).name("Watermark|"+rule.getName());



public class EventWaterMark implements WatermarkGeneratorSupplier<JSONObject> {
    private static final long serialVersionUID = -2338922000184097299L;
    private final String eventTimeFieldNameJSONObject;
    private long currentMaxTimestamp;
    private String eventTimeFieldName;
    private long maxOutOfOrderness;


    public EventWaterMark(Window window) {
        this.maxOutOfOrderness = window.getMaxOutOfOrderness();
        this.eventTimeFieldNameJSONObject = window.getAttrField();
        this.eventTimeFieldName = window.getAttrField();

    }

    @Override
    public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
        return new WatermarkGenerator<JSONObject>() {

            @Override
            public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
                currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                // 无后续数据,窗口不会关闭计算?
                watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
# 使用当前时间,无法处理历史的数据
//                watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));
            }
        };
    }
}

解决思路: 记录最后一条数据进入的时间, 多少秒后无数据进来,就主动向后推进watermark多少秒,保持watermark向后推进

更新后自定义的watermark实现:

public class EventWaterMark implements WatermarkGeneratorSupplier<JSONObject> {
    private static final long serialVersionUID = -2338922000184097299L;
    private final String eventTimeFieldNameJSONObject;
    private long currentMaxTimestamp;
    private String eventTimeFieldName;
    private long maxOutOfOrderness;

    // 当前数据进入的时间
    private long currentDateTimeMillis;


    public EventWaterMark(Window window) {
        this.maxOutOfOrderness = window.getMaxOutOfOrderness();
        this.eventTimeFieldNameJSONObject = window.getAttrField();
        this.eventTimeFieldName = window.getAttrField();
    }

    @Override
    public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
        return new WatermarkGenerator<JSONObject>() {

            @Override
            public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
                currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));

                // 记录 当前来的数据来到 时间戳
                currentDateTimeMillis = System.currentTimeMillis();

            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                // 无后续数据,最后一个窗口不会关闭计算,
//                watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
                // 使用System.currentTimeMillis(),如果第一条数据时间很早, 窗口不会进行关闭计算
//                watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));

                // 无初始数据进入 watermark不用推进, 多并行度时候,会取最小的watermark,通过withIdleness 将分区标记为空闲,这样watermark就可以往下走了。
                if (currentMaxTimestamp - maxOutOfOrderness <= 0) {
                } else {
                    // 5秒内无数据进入,watermark保持向后推进, 确保无数据进入,窗口能闭窗计算
                    if (System.currentTimeMillis() - currentDateTimeMillis >= 5000) {
                        // 无数据进入,保持watermark递增
                        currentMaxTimestamp = currentMaxTimestamp  + System.currentTimeMillis() - currentDateTimeMillis;
                        watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));

                        // 模拟数据进入 设置数据进入时间
                        currentDateTimeMillis = System.currentTimeMillis();
                    } else {
                        watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
                    }
                }
            }
        };
    }
}

上面虽然可以解决最后一个窗口闭合问题,但是还有个问题,当执行繁忙的时候,出现反压,导致source消费为0,  此时持续高反压时间大于代码中配置的5秒后, 此时自定义的watermark由于没收到上游source算子的数据,就会自动的向后推进watermark,这样就会导致,窗口会及时闭合计算,当反压结束后, source再来的数据 已经超过 窗口时间和延迟时间,, 直接作为迟到数据不参与窗口计算了。 

此处暂未想到更好的解决方法文章来源地址https://www.toymoban.com/news/detail-549147.html

到了这里,关于flink时间窗口无新的数据进来最后一个窗口不关闭的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)

                           星光下的赶路人star的个人主页                        将自己生命力展开的人,他的存在,对别人就是愈疗 1、从《星球大战》说起 为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例

    2024年02月07日
    浏览(47)
  • Flink中的时间和窗口

    在传统的批处理系统中,我们可以等到一批数据全部都到齐了之后,对其做相关的计算;但是在实时处理系统中,数据是源源不断的,正常情况下,我们就得来一条处理一条。那么,我们应该如何统计某个实时数据源中最近一段时间内的数据呢? 在Flink的观念中,引入了“窗

    2024年02月08日
    浏览(47)
  • 《Flink学习笔记》——第六章 Flink的时间和窗口

    6.1 时间语义 6.1.1 Flink中的时间语义 对于一台机器而言,时间就是系统时间。但是Flink是一个分布式处理系统,多台机器“各自为政”,没有统一的时钟,各自有各自的系统时间。而对于并行的子任务来说,在不同的节点,系统时间就会有所差异。 我们知道一个集群有JobMana

    2024年02月11日
    浏览(40)
  • Flink-【时间语义、窗口、水位线】

    🌰:可乐 可乐的生产日期 = 事件时间(可乐产生的时间); 可乐被喝的时间 = 处理时间(可乐被处理【喝掉=处理】的时间)。 机器时间:可能不准确(例如:A可乐厂的时钟比较慢,B可乐厂的时钟比较快,但实际上B产生可乐的时间比A产生可乐的时间慢,却被先处理了)

    2024年02月01日
    浏览(51)
  • Flink中的时间和窗口操作

    本专栏案例代码和数据集链接: https://download.csdn.net/download/shangjg03/88477960 在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计

    2024年02月08日
    浏览(55)
  • 【Apache Flink】基于时间和窗口的算子-配置时间特性

    Apache Flink 它提供了多种类型的时间和窗口概念,使得用户能够进行准确的时间计算。在数据处理任务中,时间的概念是非常重要的,对于一些复杂的实时流处理任务,如事件按时间顺序的聚合、分割和窗口计算,时间更是关键所在。而在这类任务中,选择使用何种时间特性是

    2024年02月08日
    浏览(37)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(50)
  • PyTorch-Forecasting一个新的时间序列预测库

    时间序列预测在金融、天气预报、销售预测和需求预测等各个领域发挥着至关重要的作用。PyTorch- forecasting是一个建立在PyTorch之上的开源Python包,专门用于简化和增强时间序列的工作。在本文中我们介绍PyTorch-Forecasting的特性和功能,并进行示例代码演示。 PyTorch-Forecasting的安

    2024年02月06日
    浏览(42)
  • 8 分钟看完这 7000+ 字,Flink 时间窗口和时间语义这对好朋友你一定搞得懂!外送窗口计算和水印一并搞懂!!!

    目录 一、时间语义 时间窗口 1. 前摘: 1.1 Flink的时间和窗口 1.2 什么是时间窗口和时间语义呢? 2. 时间窗口 2.1 举个例子: 2.2 3个实时数据计算场景 3. 时间语义 二、Flink上进行窗口计算: 1. 一个Flink窗口应用的大致骨架结构 2. Flink窗口的骨架结构中有两个必须的两个操作:

    2024年01月23日
    浏览(38)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包