【API篇】十一、Flink水位线传递与迟到数据处理

这篇具有很好参考价值的文章主要介绍了【API篇】十一、Flink水位线传递与迟到数据处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、水位线传递

上游task处理完水位线,时钟改变后,要把数据和当前水位线继续往下游算子的task发送。当一个任务接收到多个上游并行任务传递来的水位线时,以最小的那个作为当前任务的事件时钟。如图:上游算子并行度为4,:

- 第一波的2.4.3.6传递到下游task,取2
- 其中一个上游task的数据4到了,传递到下游,4.4.3.6,此时,水位线被更新为最小的3
- 其中一个上游task的7到了,下游task为4.7.3.6,最小仍为3,不更新
- 上游task的6到下游,下游为4.7.6.6,最小为4,水位线再更新

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

总结:

  • 接收到上游多个,取最小
  • 往下游多个发送,广播

使用上篇的乱序流来查看水位线的传递,这次把并行度不能再是1,设置为2

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(2);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}


执行:

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据
画个示意图:

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

2、水位线设置空闲等待

结合上图,上面是并行度为2,数据进来了会轮询到两个上游task,如果此时一个上游task一直没有数据进来,而当前Task是以最小的那个作为当前任务的事件时钟,就会导致下游接收的Task时钟一直为起始值而无法推进,进而导致窗口无法触发。

public class WatermarkIdlenessDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        //  MyPartitioner是自定义分区器:数据%分区数,只输入奇数,都只会去往map的一个子任务,余数总为1,0.1两个map的task总去1
        SingleOutputStreamOperator<Integer> socketDS = env
                .socketTextStream("hadoop102", 7777)
                .partitionCustom(new MyPartitioner(), r -> r)
                .map(r -> Integer.parseInt(r))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Integer>forMonotonousTimestamps()
                                .withTimestampAssigner((r, ts) -> r * 1000L)
                                
                );


        // 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口
        socketDS
                .keyBy(r -> r % 2)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
                        long startTs = context.window().getStart();
                        long endTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                        long count = elements.spliterator().estimateSize();

                        out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

                    }
                })
                .print();


        env.execute();
    }
}

运行:

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

分析:以上demo中,为了实现数据总流向一个子task,用了自定义分区器:

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

.partitionCustom(new MyPartitioner(), r -> r)

以输出数据为key,key除以并行度2区域为分区逻辑,如果我一直输入奇数,分区值就一直为1,就可以实现数据只流向其中一个子task。流向下游算子时,一个task始终没数据,导致取小的时候一直取到了没数据的原始time,时钟无法更新,窗口无法触发。此时就需要设置最大空闲时间,太久没数据来时,就不让它参与比较。

.withIdleness(Duration.ofSeconds(5))  //空闲等待5s

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

此时,输入到9时,已到5s时间,不再比较另一个没数据的task,11一进来,立马触发窗口

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

3、迟到数据处理:窗口允许迟到

前面为了解决乱序流,提出了延迟的概念:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3));

以上,即窗口延迟触发3秒,即让水位线的推进值 = 当前值 - 3,以便争取为乱序数据更多的时间进入窗口。但当延迟完成,窗口触发计算和关闭后,再来的属于已关闭窗口的数据就不会被统计在内了,这些数据也成为迟到数据。(本来8.30上课,老师等等家远的学生,说8.40开始讲课,结果你却9.00才到,那就门口站着取,别听了,类比数据不会再被对应窗口统计)

Flink窗口允许迟到数据,即触发窗口后,会先计算当前结果,但不关闭窗口(触发计算和关窗是两个动作)。 以后每来一条迟到数据,就触发一次这条数据所在窗口的增量计算。直到水位线被推进到了窗口结束时间 + 推迟时间。

注意区分延迟和推迟,延迟是老师等你到8.40上课(触发计算时间延长了),推迟则是,8.40课开始上了(触发计算了),但教室门不关,你在开始上课后(开始上课类比触发计算)10分钟的铃声没响之前(类比推迟时间为10分钟),能到的话,你依旧可以进教室听课。如果过了推迟时间,你仍没有到,那就窗口关闭,教室关门,你去网吧游荡吧。总结就是:

  • 延迟时间,操作的是触发计算的时间,用来处理乱序问题
  • 推迟时间,操作的是触发关窗的时间,用来处理迟到数据
.window(TumblingEventTimeWindows.of(Time.seconds(10)))  //窗口10s
.allowedLateness(Time.seconds(3))  //触发关窗延迟3秒

还是乱序流的例子,多一个allowedLateness

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(3))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}


此时窗口为10s,延迟3s触发计算,窗口结束时间 + 推迟时间才触发关闭,即水位线到达10+3=13s时,才触发关窗。在水位线未被推到13前,对于迟到的数据,会再次触发计算,且是来一条,触发一次计算。关窗后,再来迟到数据就在不管了,不会触发计算。

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

这也和前面整理的窗口生命周期对上了:计算和关窗实际是两个动作,窗口销毁的时机(关窗)是在时间进展 >= 窗口最大时间戳(end-1ms) + 允许迟到时间(默认0)

4、迟到数据处理:侧流输出

在上面的延迟关窗与允许迟到的基础上,肯定还是不能囊括所有数据,因为乱序程度理论上可以无限大,如上的例子,对于等了10分钟才开课,且到了关教室门的时间还没到的学生,让去网吧游荡也不合理(类比流中直接丢弃这个数据),可以考虑把严重迟到的学生领到保安室,对应到Flink,那就是把乱序极大的数据使用侧流输出。关键代码:

OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));  //侧流Tag对象
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateTag)  //迟到数据侧流输出

//主流
process.print();
// 从主流获取侧输出流,打印
process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

完整demo:

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);

        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );


        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

        env.execute();
    }
}

执行:

【API篇】十一、Flink水位线传递与迟到数据处理,Flink,flink,大数据

5、问

如果watermark设置延时等待3s,窗口允许迟到2s,为什么不直接延时等待5s?文章来源地址https://www.toymoban.com/news/detail-720120.html

答:
  • 首先延时时间不能设置太大,因为这会导致计算延迟太大,失去结果的实时性
  • 其次,窗口允许迟到是对迟到数据的补偿处理,尽量让结果准确,修正结果的
  • 因此,一般延时时间不设置一个较大的值,常为秒级,而允许迟到时间则可以用来处理大部分迟到数据,极端迟到的数据,可使用侧流输出,获取后再做对应的处理

到了这里,关于【API篇】十一、Flink水位线传递与迟到数据处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink水位线

    目录 一、什么是水位线 1》有序流中的水位线 2》乱序流中的水位线 3》水位线特性 二、水位线和窗口的工作原理 1》窗口 三、 生成水位线 1》生成水位线的总体原则 2》水位线生成策略 3》 Flink内置水位线 四、自定义水位线生成器 1》周期性水位线生成器(Periodic Generator)

    2024年04月23日
    浏览(33)
  • 【FLink】水位线(Watermark)

    目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式水位线生成器(Punc

    2024年02月21日
    浏览(45)
  • 【大数据】流处理基础概念(二):时间语义(处理时间、事件时间、水位线)

    流处理基础概念(一):Dataflow 编程基础、并行流处理 流处理基础概念(二):时间语义(处理时间、事件时间、水位线) 流处理基础概念(三):状态和一致性模型(任务故障、结果保障) 😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点

    2024年02月19日
    浏览(45)
  • Flink-【时间语义、窗口、水位线】

    🌰:可乐 可乐的生产日期 = 事件时间(可乐产生的时间); 可乐被喝的时间 = 处理时间(可乐被处理【喝掉=处理】的时间)。 机器时间:可能不准确(例如:A可乐厂的时钟比较慢,B可乐厂的时钟比较快,但实际上B产生可乐的时间比A产生可乐的时间慢,却被先处理了)

    2024年02月01日
    浏览(52)
  • Flink-水位线和时间语义

    在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。 在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Fli

    2024年02月04日
    浏览(50)
  • 【入门Flink】- 09Flink水位线Watermark

    在 窗口的处理过程 中,基于数据的时间戳,自定义一个 “逻辑时钟” 。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 用来衡量 事件时间 进展的标记,就被称作 “水位线”(Watermark) 。 具体实现上,水位线可以看作一条 特殊的数

    2024年01月17日
    浏览(48)
  • Flink之Watermark水印、水位线

    在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。 事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件

    2024年02月08日
    浏览(46)
  • flink水位线传播及任务事件时间

    本文来讲解一下flink的水位线传播及对其对任务事件时间的影响 首先flink是通过从源头生成水位线记录的方式来实现水位线传播的,也就是说水位线是嵌入在正常的记录流中的特殊记录,携带者水位线的时间戳,以下我们就通过图片的方式来讲解下水位线是如何传播以及更新

    2024年02月16日
    浏览(55)
  • Flink详解系列之五--水位线(watermark)

    1、概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处

    2024年02月13日
    浏览(47)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包