7.2、如何理解Flink中的水位线(Watermark)

这篇具有很好参考价值的文章主要介绍了7.2、如何理解Flink中的水位线(Watermark)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0、版本说明

1、什么是水位线?

2、水位线使用场景?

3、设计水位线主要为了解决什么问题?

4、怎样在flink中生成水位线?

4.1、自定义标记 Watermark 生成器

4.2、自定义周期性 Watermark 生成器

4.3、内置Watermark生成器 - 有序流水位线生成器

4.4、内置Watermark生成器 - 乱序流水位线生成器

4.5、在 读取数据源时 添加水位线

5、水位线和窗口的关系?

6、水位线在各个算子间的传递

6.1、测试用例 - 不设置 withIdleness 超时时间

6.2、测试用例 - 设置 withIdleness 超时时间


0、版本说明

        开发语言:java1.8

        Flink版本:1.17

        官网链接:官网链接

1、什么是水位线?

        Flink中水位线是一条特殊的数据(long timestamp)

        它会以时间戳的形式作为一条标识数据插入到数据流中

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


2、水位线使用场景?

        使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

        通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


3、设计水位线主要为了解决什么问题?

        设计水位线主要是为了解决实时流中数据乱序和迟到的问题

        思考:什么原因造成了数据流的乱序呢?

                如今数据采集、数据传输大多都在分布式系统中完成

                各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


4、怎样在flink中生成水位线?

        Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据

4.1、自定义标记 Watermark 生成器

标记 Watermark 生成器特点:

        每条数据到来后,都会为其生成一条 Watermark

适用场景:

        数据量小且数据有序

代码示例:        

Step1:自定义 标记水位线生成器 实现类

// 自定义 标记水位线生成器 实现类
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {

    // 每进入一条数据,都会调用一次 onEvent 方法
    @Override
    /*
     * 参数说明:
     *   @event : 进入到该方法的事件数据
     *   @eventTimestamp : 时间戳提取器提取的时间戳
     * */
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        //发射水位线
        output.emitWatermark(new Watermark(eventTimestamp));
    }

    // 不需要实现
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    }
}

Step2:自定义 标记性水位线生成策略 实现类

// TODO 自定义 标记性水位线生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 实例化一个 事件时间提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };
        return timestampAssigner;
    }

    // TODO 实例化一个 watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PeriodWatermarkGenerator<>();
    }
}

Step3:使用 标记性水位线生成策略

// TODO 使用 自定义标记 Watermark 生成器
public class UserPeriodWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 5.触发程序执行
        env.execute();
    }
}

查看运行结果:

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


4.2、自定义周期性 Watermark 生成器

标记 Watermark 生成器特点:

        基于处理时间,周期性生成 Watermark

适用场景:

        数据量大且可能存在一定程度数据延迟(乱序)

代码示例:        

Step1:自定义 周期性水位线生成器 实现类

// 自定义 周期性水位线生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
    // 设置变量,用来保存 当前最大的事件时间
    private long currentMaxTimestamp;
    // 设置变量,指定最大的乱序时间(等待时间)
    private final long maxOutOfOrderness = 0000; // 3 秒

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 只更新当前最大时间戳,不再发生水位线
        if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
    }

    // 周期性 生成水位线
    // 每个 setAutoWatermarkInterval 时间,调用一次该方法
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
    }
}

Setp2:自定义 周期性水位线生成策略 实现类

// 自定义 周期性水位线生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 实例化一个 事件时间提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };

        return timestampAssigner;
    }

    // TODO 实例化一个 watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedWatermarkGenerator<>();
    }

}

Step3:周期性水位线生成策略

// TODO 使用 自定义周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)
        PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();

        // TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐
        WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
                .withTimestampAssigner((event, timestamp) -> event.f1);

        // 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


4.3、内置Watermark生成器 - 有序流水位线生成器

有序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,最大乱序时间为0

适用场景:

        大数量有序流

代码示例:

// TODO 内置Watermark生成器 - 有序流水位线生成器
public class UserForMonotonousTimestamps {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 创建 内置水位线生成策略
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 内置水位线生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


4.4、内置Watermark生成器 - 乱序流水位线生成器

乱序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

适用场景:

        大数量乱序流

代码示例:

// TODO 内置Watermark生成器 - 乱序流水位线生成器
public class UserForBoundedOutOfOrderness {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 获取 WatermarkStrategy实例
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 内置水位线生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


4.5、在 读取数据源时 添加水位线

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2.创建 Source 对象
Source source = DataGeneratorSource、KafkaSource...

// 3.读取 source时添加水位线
env
        .fromSource(source, WatermarkStrategy实例, "source name")   
        .print()
;

// 4.触发程序执行
env.execute();

5、水位线和窗口的关系?

窗口什么时候创建?

        当窗口内的第一条数据到达时

窗口什么时候触发计算?

        当阈值水位线到达窗口时

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


6、水位线在各个算子间的传递

        下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

测试代码:

// TODO 测试水位线的传递
public class TransmitWaterMark {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3); 

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

        source
                .partitionCustom(
                        new Partitioner<String>() {
                            @Override
                            public int partition(String key, int numPartitions) {
                                if (key.equals("a")) {
                                    return 0;
                                } else if (key.equals("b")) {
                                    return 1;
                                } else {
                                    return 2;
                                }
                            }
                        }, value -> value.split(",")[0]
                )
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                //.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy())
                                .withTimestampAssigner((element,recordTimestamp) -> element.f1)
                                .withIdleness(Duration.ofSeconds(5))  //空闲等待5s
                )
                .process(new ShowProcessFunction()).setParallelism(1)
                .print();
        
        env.execute();
    }
}

6.1、测试用例 - 不设置 withIdleness 超时时间

现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据


6.2、测试用例 - 设置 withIdleness 超时时间

现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据

7.2、如何理解Flink中的水位线(Watermark),# Flink API 使用技巧,flink,大数据文章来源地址https://www.toymoban.com/news/detail-720219.html

到了这里,关于7.2、如何理解Flink中的水位线(Watermark)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink详解系列之五--水位线(watermark)

    1、概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处

    2024年02月13日
    浏览(39)
  • [AIGC] 深入理解Flink中的窗口、水位线和定时器

    Apache Flink是一种流处理和批处理的混合引擎,它提供了一套丰富的APIs,以满足不同的数据处理需求。在本文中,我们主要讨论Flink中的三个核心机制:窗口(Windows)、水位线(Watermarks)和定时器(Timers)。 在流处理应用中,一种常见的需求是计算某个时间范围内的数据,这

    2024年03月27日
    浏览(53)
  • ES节点磁盘水位线cluster.routing.allocation.disk.watermark

    为了控制es节点磁盘写入大小,es设置了水位线这一参数,具体有两个: cluster.routing.allocation.disk.watermark.low   (Dynamic) Controls the low watermark for disk usage. It defaults to  85% , meaning that Elasticsearch will not allocate shards to nodes that have more than 85% disk used. It can alternatively be set to a ratio value

    2024年02月09日
    浏览(32)
  • Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)

                           星光下的赶路人star的个人主页                        将自己生命力展开的人,他的存在,对别人就是愈疗 1、从《星球大战》说起 为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例

    2024年02月07日
    浏览(41)
  • flink生成水位线记录方式--周期性水位线生成器

    在flink基于事件的时间处理中,水位线记录的生成是一个很重要的环节,本文就来记录下几种水位线记录的生成方式的其中一种:周期性水位线生成器 1.1 BoundedOutOfOrdernessTimeStampExtractor 他会接收一个表示最大延迟的参数,比如1分钟,意味着如果到达的元素的事件时间和之前到

    2024年02月07日
    浏览(52)
  • flink生成水位线记录方式--基于特殊记录的水位线生成器

    在flink基于事件的时间处理中,水位线记录的生成是一个很重要的环节,本文就来记录下几种水位线记录的生成方式的其中一种:基于特殊记录的水位线生成器 我们发送的事件中,如果带有某条特殊记录的元素代表了某种进度的标识的话,我们可以基于这条特殊的记录生成水

    2024年02月07日
    浏览(49)
  • flink水位线

    目录 一、什么是水位线 1》有序流中的水位线 2》乱序流中的水位线 3》水位线特性 二、水位线和窗口的工作原理 1》窗口 三、 生成水位线 1》生成水位线的总体原则 2》水位线生成策略 3》 Flink内置水位线 四、自定义水位线生成器 1》周期性水位线生成器(Periodic Generator)

    2024年04月23日
    浏览(28)
  • Flink-水位线和时间语义

    在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。 在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Fli

    2024年02月04日
    浏览(45)
  • Flink-【时间语义、窗口、水位线】

    🌰:可乐 可乐的生产日期 = 事件时间(可乐产生的时间); 可乐被喝的时间 = 处理时间(可乐被处理【喝掉=处理】的时间)。 机器时间:可能不准确(例如:A可乐厂的时钟比较慢,B可乐厂的时钟比较快,但实际上B产生可乐的时间比A产生可乐的时间慢,却被先处理了)

    2024年02月01日
    浏览(45)
  • Flink-水位线的设置以及传递

    6.2.1 概述 分类 有序流 无序流 判断的时间延迟 延迟时间判定 6.2.2 水位线的设置 分析 DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略 但是WatermarkStrategy是一个接口 有序流 因此调用静态方法forMonotonousT

    2023年04月15日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包