Flink-水位线的设置以及传递

这篇具有很好参考价值的文章主要介绍了Flink-水位线的设置以及传递。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

6.2 水位线

6.2.1 概述

  1. 分类
  • 有序流

Flink-水位线的设置以及传递

  • 无序流
    Flink-水位线的设置以及传递
    判断的时间延迟
  1. 延迟时间判定

6.2.2 水位线的设置

  1. 分析

Flink-水位线的设置以及传递
DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略

Flink-水位线的设置以及传递
但是WatermarkStrategy是一个接口

  • 有序流

Flink-水位线的设置以及传递

因此调用静态方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator
Flink-水位线的设置以及传递

AscendingTimestampsWatermarks这个继承自BoundOutOfOrdernessWatermarks

Flink-水位线的设置以及传递
Flink-水位线的设置以及传递
Flink-水位线的设置以及传递

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口

Flink-水位线的设置以及传递

然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy,参数是new SerializableTimestampAssigner的对象,重写extractTimestamp方法,这个方法作用是怎么样从数据里面提取时间戳

Flink-水位线的设置以及传递

  • 乱序流

Flink-水位线的设置以及传递
因此调用静态方法forBoundedOutOfOrderness(参数为最大乱序程度,也就是延迟时间)后new BoundOutOfOrdernessWatermarks返回 WatermarkGernerator

Flink-水位线的设置以及传递

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口(跟上面一样了)

Flink-水位线的设置以及传递

后面也跟有序一样,然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy

  • 关系图
    Flink-水位线的设置以及传递
  1. 完整代码
public class WatermarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.输入
        SingleOutputStreamOperator<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L))
            
//                //有序流的watermark生成
//                //forMonotonousTimestamps前指定泛型
//                .assignTimestampsAndWatermarks(WatermarkStrategy
//                        .<Event>forMonotonousTimestamps()//得到WatermarkGenerator
//                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {//返回WatermarkStrategy
//                            @Override
//                            //参数是当前传过来的数据element,另一个传出的recordTimestamp是时间戳
//                            public long extractTimestamp(Event element, long recordTimestamp) {
//                                return element.timestamp;
//                            }
//                        })
//                )
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    //forMonotonousTimestamps前指定泛型
                    //forMonotonousTimestamps参数是最大乱序时间
                    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                        @Override
                        public long extractTimestamp(Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            );
        env.execute();
    }
}

6.2.3 自定义水位线

  1. 分析

Flink-水位线的设置以及传递

或者直接new 一个接口WatermarkStrategy重写createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取时间戳分配器的方法(生成TimeStampAssigner)创建watermark

Flink-水位线的设置以及传递

Flink-水位线的设置以及传递

Flink-水位线的设置以及传递

Flink-水位线的设置以及传递

WatermarkGenerator是个接口,有两个方法分别是onEvent方法,主要目标是要发出一个WatermarkOutput,另一个是onperiodicEmit方法,表示周期性的生成,周期性生成时间默认是2秒,env调用getConfig后调用setAutoWatermarkInterval后可以更改周期性生成时间

Flink-水位线的设置以及传递
Flink-水位线的设置以及传递

WatermarkOutput也是一个接口,调用emitWatermark就能发出一个watermark,

Flink-水位线的设置以及传递

Flink-水位线的设置以及传递

除了WatermarkGenerator接口还有TimeStampAssigner也是个接口,里面只有一个方法叫做extractTimestamp,目的是从当前数据提取时间戳,同时也会作为WatermarkGenerator这个接口中onEvent方法中传入的参数eventTimestamp时间戳

  • 关系图
    Flink-水位线的设置以及传递
    这图估计也就我自己能看的懂了。。。
  1. 代码
  • 正常水位线
// 自定义水位线的产生
public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();
        env.execute();
    }
    //内部静态类
    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
        @Override
        //createTimestampAssigner方法生成TimeStampAssigner
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                //extractTimestamp,目的是从当前数据提取时间戳
                public long extractTimestamp(Event element, long recordTimestamp)
                {
                    return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                }
            };
        }
        @Override
        //createWatermarkGenerator生成WatermarkGenerator
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }
    }
    //CustomPeriodicGenerator实现WatermarkGenerator接口,并重写方法
    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
        private Long delayTime = 5000L; // 延迟时间
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
        @Override
        //更新当前时间戳,这边不发送水位线,目的是保存时间戳
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                output) {
            // 每来一条数据就调用一次
            maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发射水位线,默认 200ms 调用一次
            //-1毫秒都是为了贴切窗口闭合的时候左闭右开设计
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

  • 断点水位线

在onevent根据条件触发,onPeriodicEmit这个方法中就不用做了

    public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
        @Override
        public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
        // 只有在遇到特定的 itemId 时,才发出水位线
            if (r.user.equals("Mary")) {
                output.emitWatermark(new Watermark(r.timestamp - 1));
            }
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
        }
    }

  • 在自定义数据源中发送水位线

使用 collectWithTimestamp 方法将数据发送出去,原来直接out.collect()的

Flink-水位线的设置以及传递

参数是当前数据还有当前数据的时间戳,跟水位线生成中extractTimestamp(Event element, long recordTimestamp)这个类似,也是一个数据是什么,一个时间戳是啥

然后发送水位线,用emitWatermark方法生成

public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSourceWithWatermark()).print();
        env.execute();
    }
    // 泛型是数据源中的类型
    public static class ClickSourceWithWatermark implements SourceFunction<Event>
    {
        private boolean running = true;
        @Override
        public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                String username = userArr[random.nextInt(userArr.length)];
                String url = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 发送水位线
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }
}

6.2.4 水位线的传递

针对多个分区,上游需要告诉下游水位线情况,采用的是广播的方式给所有下游子任务

但是上游如果也是并行的,向下传输的水位线可能有多个,以上游发过来最小的时钟为准,并且下游会有一个分区专门保存上游发过来的水位线最小的数据

Flink-水位线的设置以及传递

Flink-水位线的设置以及传递文章来源地址https://www.toymoban.com/news/detail-413700.html

到了这里,关于Flink-水位线的设置以及传递的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink水位线

    目录 一、什么是水位线 1》有序流中的水位线 2》乱序流中的水位线 3》水位线特性 二、水位线和窗口的工作原理 1》窗口 三、 生成水位线 1》生成水位线的总体原则 2》水位线生成策略 3》 Flink内置水位线 四、自定义水位线生成器 1》周期性水位线生成器(Periodic Generator)

    2024年04月23日
    浏览(20)
  • 【FLink】水位线(Watermark)

    目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式水位线生成器(Punc

    2024年02月21日
    浏览(33)
  • Flink-水位线和时间语义

    在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。 在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Fli

    2024年02月04日
    浏览(33)
  • Flink-【时间语义、窗口、水位线】

    🌰:可乐 可乐的生产日期 = 事件时间(可乐产生的时间); 可乐被喝的时间 = 处理时间(可乐被处理【喝掉=处理】的时间)。 机器时间:可能不准确(例如:A可乐厂的时钟比较慢,B可乐厂的时钟比较快,但实际上B产生可乐的时间比A产生可乐的时间慢,却被先处理了)

    2024年02月01日
    浏览(40)
  • 【入门Flink】- 09Flink水位线Watermark

    在 窗口的处理过程 中,基于数据的时间戳,自定义一个 “逻辑时钟” 。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 用来衡量 事件时间 进展的标记,就被称作 “水位线”(Watermark) 。 具体实现上,水位线可以看作一条 特殊的数

    2024年01月17日
    浏览(35)
  • Flink之Watermark水印、水位线

    在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。 事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件

    2024年02月08日
    浏览(31)
  • flink水位线传播及任务事件时间

    本文来讲解一下flink的水位线传播及对其对任务事件时间的影响 首先flink是通过从源头生成水位线记录的方式来实现水位线传播的,也就是说水位线是嵌入在正常的记录流中的特殊记录,携带者水位线的时间戳,以下我们就通过图片的方式来讲解下水位线是如何传播以及更新

    2024年02月16日
    浏览(37)
  • Flink详解系列之五--水位线(watermark)

    1、概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处

    2024年02月13日
    浏览(31)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(36)
  • 7.2、如何理解Flink中的水位线(Watermark)

    目录 0、版本说明 1、什么是水位线? 2、水位线使用场景? 3、设计水位线主要为了解决什么问题? 4、怎样在flink中生成水位线? 4.1、自定义标记 Watermark 生成器 4.2、自定义周期性 Watermark 生成器 4.3、内置Watermark生成器 - 有序流水位线生成器 4.4、内置Watermark生成器 - 乱序流

    2024年02月08日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包