Flink 源码学习|Watermark 与 WatermarkGenerator

这篇具有很好参考价值的文章主要介绍了Flink 源码学习|Watermark 与 WatermarkGenerator。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

上游文档:

  • Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 Watermark》学习笔记
  • Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

Watermark

Watermark 是在各个算子生成的、用于标记当前数据流事件时间的对象。当 Watermark 到达后,就意味着该数据流原则上将 不会 再到达比 Watermark 的事件时间更小的消息,即在 Watermark 后到达的事件时间更小的消息视作延迟消息。

首先,让我们来看一下 Watermark 类的源码。

源码org.apache.flink.api.common.eventtime.Watermark【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

@Public
public final class Watermark implements Serializable {

    private static final long serialVersionUID = 1L;

    /** Thread local formatter for stringifying the timestamps. */
    private static final ThreadLocal<SimpleDateFormat> TS_FORMATTER =
            ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));

    // ------------------------------------------------------------------------

    /** The watermark that signifies end-of-event-time. */
    public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

    // ------------------------------------------------------------------------

    /** The timestamp of the watermark in milliseconds. */
    private final long timestamp;

    /** Creates a new watermark with the given timestamp in milliseconds. */
    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }

    /** Returns the timestamp associated with this Watermark. */
    public long getTimestamp() {
        return timestamp;
    }

    /**
     * Formats the timestamp of this watermark, assuming it is a millisecond timestamp. The returned
     * format is "yyyy-MM-dd HH:mm:ss.SSS".
     */
    public String getFormattedTimestamp() {
        return TS_FORMATTER.get().format(new Date(timestamp));
    }

    // ------------------------------------------------------------------------

    @Override
    public boolean equals(Object o) {
        return this == o
                || o != null
                        && o.getClass() == Watermark.class
                        && ((Watermark) o).timestamp == this.timestamp;
    }

    @Override
    public int hashCode() {
        return Long.hashCode(timestamp);
    }

    @Override
    public String toString() {
        return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')';
    }
}

可以看到,Watermark 类主要就是用来存储当前 watermark 的毫秒级时间戳,具体地:

  • 使用时间戳构造实例化 Watermark 对象,Watermark 对象在实例化后,不能修改其存储的时间戳
  • 提供 long getTimestamp()String getFormattedTimestamp() 两种查询 Watermark 对象时间戳的方法

WatermarkGenerator

接着,我们来看 watermark 的生成接口 WatermarkGenerator 的源码。

源码org.apache.flink.api.common.eventtime.WatermarkGenerator【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;

/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
 * fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
 * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine and remember the event
     * timestamps, or to emit a watermark based on the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks are generated depends on {@link
     * ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}

WatermarkGernator 接口,既可以基于消息,也可以基于周期。WatermarkGenerator 接口有两个方法:

  • void onEvent(T event, long eventTimestamp, WatermarkOutput output):这个方法会在每个消息到达时被调用一次,其参数 event 为消息本身,参数 eventTimestamp 为消息的事件时间,output 为接收生成的 watermark 的对象。
  • void (WatermarkOutput output):这个方法会被周期性地调用,其参数 output 为接收生成的 watermark 的对象。

在实现 WatermarkGenerator 接口时,既可以在 onEvent 方法中生成 watermark,也可以在 onPeriodicEmit 方法中生成 watermark。因此,基于 WatermarkGenerator 接口,可以实现 标记生成周期性生成 两种 watermark 生成器。

下面,我们来看 Flink 内置的几个 watermark 生成器。

Flink 内置的 watermark 生成器

called for every event, allows the watermark generator to examine and rememb,Flink,flink,watermark,生成器

在这里,我们仅介绍 Flink 的 flink-core 项目中如下内置的 watermark 生成器:

  • NoWatermarksGenerator:不生成 watermark 的生成器
  • BoundedOutOfOrdernessWatermarks:固定延迟时间的周期性 watermark 生成器
  • AscendingTimestampsWatermarks:零延迟时间的周期性 watermark 生成器

不生成 watermark 的生成器:NoWatermarksGenerator

最简单的,不生成任何 watermark 的生成器。在实现上,在 onEvent 方法和 onPeriodicEmit 方法中均不生成 watermark。

源码org.apache.flink.api.common.eventtime.NoWatermarksGenerator【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;

@Public
public final class NoWatermarksGenerator<E> implements WatermarkGenerator<E> {

    @Override
    public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {}

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {}
}

固定延迟时间的周期性 watermark 生成器:BoundedOutOfOrdernessWatermarks

当输入数据流中消息的事件时间不完全有序,但是对于绝大部分元素,滞后时间通常不会超过一个固定的时间长度时,我们可以通过在当前最大事件时间的基础上减去一个固定延迟时间,来生成 watermark。Flink 内置的 watermark 生成器 BoundedOutOfOrdernessWatermarks 实现了这种功能。

源码org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;

import java.time.Duration;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

可以看到,在 BoundedOutOfOrdernessWatermarks 类中:

  • 使用固定的延迟时间 maxOutOfOrderness 来实例化
  • 使用示例属性 maxTimestamp 存储当前所有消息的最大事件时间,当每个消息到达时,onEvent 方法被调用,并更新 maxTimestamp 属性
  • 周期性地生成 watermark,当 onPeriodicEmit 方法被周期性地调用时,会根据当前的最大事件时间以及固定延迟时间来生成 watermark

零延迟时间的周期性 watermark 生成器:AscendingTimestampsWatermarks

当数据源中消息的事件时间单调递增时,当前事件时间(同时也是最大事件时间)就可以充当 watermark,因为后续到达的消息的事件时间一定不会比当前事件时间小。例如,当只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳作为事件时间时,则可以保证事件时间的单调递增。

此时的 watermark 生成规则,就相当于是延迟为 0 的 “固定延迟时间的周期性生成器”。Flink 内置的 watermark 生成器 AscendingTimestampsWatermarks 实现了这个功能。

源码org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks【Github】

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;

import java.time.Duration;

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

    /** Creates a new watermark generator with for ascending timestamps. */
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}

在实现上,AscendingTimestampsWatermarks 继承了 BoundedOutOfOrdernessWatermarks,并将延迟指定为 0。文章来源地址https://www.toymoban.com/news/detail-827794.html

到了这里,关于Flink 源码学习|Watermark 与 WatermarkGenerator的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【FLink】水位线(Watermark)

    目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式水位线生成器(Punc

    2024年02月21日
    浏览(45)
  • Flink之Watermark水印、水位线

    在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。 事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件

    2024年02月08日
    浏览(46)
  • 1分钟理解Flink中Watermark机制

    本文隶属于专栏《董工的1000个大数据技术体系》摘要,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 目录 前言 一、watermark是什么? 二、乱序数据处理

    2024年02月15日
    浏览(40)
  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(47)
  • 【Flink】Flink 中的时间和窗口之水位线(Watermark)

    这里先介绍一下什么是 时间语义 , 时间语义 在Flink中是一种很重要的概念,下面介绍的 水位线 就是基于 时间语义 来讲的。 在Flink中我们提到的时间语义一般指的是 事件时间 和 处理时间 : 处理时间(Processing Time) ,一般指执行处理操作的系统时间,也就是Flink的窗口算子

    2024年02月07日
    浏览(52)
  • Flink详解系列之五--水位线(watermark)

    1、概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处

    2024年02月13日
    浏览(47)
  • 7.2、如何理解Flink中的水位线(Watermark)

    目录 0、版本说明 1、什么是水位线? 2、水位线使用场景? 3、设计水位线主要为了解决什么问题? 4、怎样在flink中生成水位线? 4.1、自定义标记 Watermark 生成器 4.2、自定义周期性 Watermark 生成器 4.3、内置Watermark生成器 - 有序流水位线生成器 4.4、内置Watermark生成器 - 乱序流

    2024年02月08日
    浏览(46)
  • 常见遍历方法 for循环、forEach、map、filter、find、findIndex、some、every

    来自于远古的遍历方式,并且涵盖多种手段,例如for in 和for of。 for循环 中使用break和continue语句(终止和跳过本次循环): for of 用来遍历数组也是可以的 for of 中也可以用break和continue for in 也可以遍历数组,但不推荐 for in 中也可以用break和continue forEach是ES5中操作数组的一种

    2024年02月08日
    浏览(53)
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月02日
    浏览(54)
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(1) - 介绍

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月01日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包