Flink 源码学习|使用 Watermark 策略(WatermarkStrategy)【v2 修订版】

这篇具有很好参考价值的文章主要介绍了Flink 源码学习|使用 Watermark 策略(WatermarkStrategy)【v2 修订版】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用事件时间时,需要通过 Flink API 的 WatermarkStrategy 接口配置 watermark 的生成策略。

我们将逐段来看这个 API 的各个部分。

继承关系

Flink 使用 WatermarkStrategy<T> 接口来构建 Watermark 策略,其中泛型 T 为输入数据流类型。

WatermarkStrategy 接口继承了 TimestampAssignerSupplierWatermarkGeneratorSupplier,即相当于包含了 TimestampAssignerWatermarkGenerator,具体地:

  • TimestampAssigner 用于从消息记录中提取事件时间戳
  • WatermarkGenerator 用于根据事件时间戳生成 watermark

源码flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L19【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;
import java.time.Duration;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

@Public
public interface WatermarkStrategy<T>
     extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {

TimestampAssignerSupplierWatermarkGeneratorSupplier 均继承了 Serializable,所以WatermarkStrategy 也继承了 Serializable,是可序列化的。这是因为在分布式计算过程中,WatermarkStrategy 可能会在不同节点之间传输。

接口方法

需要实现或重写的方法

WatermarkStrategy 接口中,有如下 3 个方法是需要被实现的,这 3 个方法分别对应需要确定的 3 个问题:

  • createWatermarkGenerator():返回实现了 WatermarkGenerator 接口的对象,该对象用于根据输入流记录的事件时间生成 watermark
  • createTimestampAssigner():返回实现了 TimestampAssigner 接口的对象,该对象用于从输入流中获取每条记录的事件时间
  • getAlignmentParameters():返回 WatermarkAlignmentParams 类的实例,该实例用于控制是否需要对齐不同输入流的 watermark

源码|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy(部分)

// ------------------------------------------------------------------------
//  Methods that implementors need to implement.
// ------------------------------------------------------------------------

/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

/**
 * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.
 */
@Override
default TimestampAssigner<T> createTimestampAssigner(
        TimestampAssignerSupplier.Context context) {
    // By default, this is {@link RecordTimestampAssigner},
    // for cases where records come out of a source with valid timestamps, for example from
    // Kafka.
    return new RecordTimestampAssigner<>();
}

/**
 * Provides configuration for watermark alignment of a maximum watermark of multiple
 * sources/tasks/partitions in the same watermark group. The group may contain completely
 * independent sources (e.g. File and Kafka).
 *
 * <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
 * the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
 */
@PublicEvolving
default WatermarkAlignmentParams getAlignmentParameters() {
    return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
}

其中,只有 createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) 方法是非默认的,这样就可以通过 Java 的 lambda 表达式语法来实现 WatermarkStrategy 接口。

指定 WatermarkGenerator 的方法

WatermarkStrategy 接口提供了如下 4 个直接指定 WatermarkGenerator 以实现 WatermarkStrategy 的默认方法。因为 WatermarkStrategy 接口只有 createWatermarkGenerator 这 1 个方法没有默认实现,所以在实现上均使用了 Java 的 lambda 表达式。

  • forMonotonousTimestamps:使用 AscendingTimestampsWatermarks 生成 watermark,适用于事件时间单调递增的场景
  • forBoundedOutOfOrderness:使用 BoundedOutOfOrdernessWatermarks 生成 watermark,适用于事件时间虽然不单调递增,但延迟时间有限的场景
  • forGenerator:使用参数提供的 WatermarkGeneratorSupplier 生成的 watermark 生成器
  • noWatermarks:不使用 watermark

源码|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy.java:197

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
 return (ctx) -> new AscendingTimestampsWatermarks<>();
}

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
 return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
 return generatorSupplier::createWatermarkGenerator;
}

static <T> WatermarkStrategy<T> noWatermarks() {
 return (ctx) -> new NoWatermarksGenerator<>();
}
指定 TimestampAssigner 的方法

WatermarkStrategy 接口中,提供了如下 3 个指定 TimestampAssigner 的默认方法。这些方法会创建一个新的 WatermarkStrategy 类,并将调用该方法的基础 WatermarkStrategy 类封装起来;在调用新类的 createWatermarkGenerator 方法时,会使用新类中重写的方法;在调用新类的其他方法时,会返回被封装的基础类的方法。

  • withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner):使用参数指定的 TimestampAssigner 的提供者类
  • withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner):使用参数指定的可序列化的 TimestampAssigner

源码|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy(部分)

default WatermarkStrategy<T> withTimestampAssigner(
     TimestampAssignerSupplier<T> timestampAssigner) {
 checkNotNull(timestampAssigner, "timestampAssigner");
 return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
}

default WatermarkStrategy<T> withTimestampAssigner(
     SerializableTimestampAssigner<T> timestampAssigner) {
 checkNotNull(timestampAssigner, "timestampAssigner");
 return new WatermarkStrategyWithTimestampAssigner<>(
         this, TimestampAssignerSupplier.of(timestampAssigner));
}
制定处理空闲数据源的方法

WatermarkStrategy 接口中,提供了 withIdleness(Duration idleTimeout) 方法用于处理存在空闲数据源的情况,其参数为标记空闲数据源的时间阈值,即当某个数据源超过 idleTimeout 没有生产消息时则标记为空闲数据源。

源码|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy#withIdleness

/**
 * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
 * created {@link WatermarkGenerator}.
 *
 * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
 * stream for that amount of time, then that partition is considered "idle" and will not hold
 * back the progress of watermarks in downstream operators.
 *
 * <p>Idleness can be important if some partitions have little data and might not have events
 * during some periods. Without idleness, these streams can stall the overall event time
 * progress of the application.
 */
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
    checkNotNull(idleTimeout, "idleTimeout");
    checkArgument(
            !(idleTimeout.isZero() || idleTimeout.isNegative()),
            "idleTimeout must be greater than zero");
    return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}
指定 AlignmentParameters 的方法

WatermarkStrategy 接口中,还提供了如下 2 个方法,用于指定 watermark 的对齐方法。在实现上,这 2 个方法也与指定 TimestampAssigner 的方法类似,在新类中重写 getAlignmentParameters() 方法。

  • withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift):指定 watermark 对齐策略;其中参数 watermarkGroup 为 watermark 的组名,maxAllowedWatermarkDrift 为在暂停消费前允许超出 watermark 时间
  • withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval):指定 watermark 对齐策略;其中新增的参数 updateInterval 为 task 上报 watermark 以及调度器制定对齐 watermark 的时间间隔

源码|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy(部分)

@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
        String watermarkGroup, Duration maxAllowedWatermarkDrift) {
    return withWatermarkAlignment(
            watermarkGroup,
            maxAllowedWatermarkDrift,
            WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
}

@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
        String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
    return new WatermarksWithWatermarkAlignment<T>(
            this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
}

整体说明

通过上述设计,使 WatermarkStrategy 接口允许使用实现 WatermarkGenerator 方法的 Java Lambda 表达式实现;对于已实现的 WatermarkStrategy 匿名类,允许通过调用指定 TimestampAssignerAlignmentParameters 的方法,来覆盖掉这两个方法的默认实现。文章来源地址https://www.toymoban.com/news/detail-846325.html

到了这里,关于Flink 源码学习|使用 Watermark 策略(WatermarkStrategy)【v2 修订版】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink之Watermark

    流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件

    2024年02月10日
    浏览(34)
  • 【入门Flink】- 09Flink水位线Watermark

    在 窗口的处理过程 中,基于数据的时间戳,自定义一个 “逻辑时钟” 。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 用来衡量 事件时间 进展的标记,就被称作 “水位线”(Watermark) 。 具体实现上,水位线可以看作一条 特殊的数

    2024年01月17日
    浏览(48)
  • Flink Watermark和时间语义

    时间语义: EventTime :事件创建时间; Ingestion Time :数据进入 Flink 的时间; Processing Time :执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间 Event Time 。数据生成的时候就会自动注入时间戳, Event Time 可以从日志数据的

    2024年02月03日
    浏览(48)
  • 【FLink】水位线(Watermark)

    目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式水位线生成器(Punc

    2024年02月21日
    浏览(45)
  • Flink之Watermark水印、水位线

    在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。 事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件

    2024年02月08日
    浏览(46)
  • 1分钟理解Flink中Watermark机制

    本文隶属于专栏《董工的1000个大数据技术体系》摘要,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 目录 前言 一、watermark是什么? 二、乱序数据处理

    2024年02月15日
    浏览(41)
  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(47)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(52)
  • Flink详解系列之五--水位线(watermark)

    1、概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处

    2024年02月13日
    浏览(47)
  • 7.2、如何理解Flink中的水位线(Watermark)

    目录 0、版本说明 1、什么是水位线? 2、水位线使用场景? 3、设计水位线主要为了解决什么问题? 4、怎样在flink中生成水位线? 4.1、自定义标记 Watermark 生成器 4.2、自定义周期性 Watermark 生成器 4.3、内置Watermark生成器 - 有序流水位线生成器 4.4、内置Watermark生成器 - 乱序流

    2024年02月08日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包