聊聊Flink必知必会(五)

这篇具有很好参考价值的文章主要介绍了聊聊Flink必知必会(五)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  1. 聊聊Flink的必知必会(三)
  2. 聊聊Flink必知必会(四)

从源码中,根据关键的代码,梳理一下Flink中的时间与窗口实现逻辑。

WindowedStream

对数据流执行keyBy()操作后,再调用window()方法,就会返回WindowedStream,表示分区后又加窗的数据流。如果数据流没有经过分区,直接调用window()方法则会返回AllWindowedStream

如下:

// 构造函数
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
    this.input = input;
    this.builder =
    new WindowOperatorBuilder<>(
    windowAssigner,
    windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
    input.getExecutionConfig(),
    input.getType(),
    input.getKeySelector(),
    input.getKeyType());
}
        
// KeyedStream类型,表示被加窗的输入流。
private final KeyedStream<T, K> input;

// 用于构建WindowOperator,最终会生成windowAssigner,Evictor,Trigger
private final WindowOperatorBuilder<T, K, W> builder;

在这里面还涉及到一些窗口的基本计算算子,比如reduce,aggregate,apply,process,sum等等.

窗口相关模型的实现

Window

Window类是Flink中对窗口的抽象。它是一个抽象类,包含抽象方法maxTimestamp(),用于获取属于该窗口的最大时间戳。

TimeWindow类是其子类。包含了窗口的start,end,offset等时间概念字段,这里会计算窗口的起始时间:

// 构造函数
public TimeWindow(long start, long end) {
    this.start = start;
    this.end = end;
}

// timestamp:获取窗口启动时的第一个时间戳epoch毫秒
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    final long remainder = (timestamp - offset) % windowSize;
    // handle both positive and negative cases
    if (remainder < 0) {
        return timestamp - (remainder + windowSize);
    } else {
        return timestamp - remainder;
    }
}

WindowAssigner

WindowAssigner表示窗口分配器,用来把元素分配到零个或多个窗口(Window对象)中。它是一个抽象类,其中重要的抽象方法为assignWindows()方法,用来给元素分配窗口。

Flink有多种类型的窗口,如Tumbling Window、Sliding Window等。各种类型的窗口又分为基于事件时间或处理时间的窗口。WindowAssigner的实现类就对应着具体类型的窗口。

SlidingEventTimeWindows是WindowAssigner的另一个实现类,表示基于事件时间的Sliding Window。它有3个long类型的字段size、slide和offset,分别表示窗口的大小、滑动的步长和窗口起始位置的偏移量。它对assignWindows()方法的实现如下:

@Override
public Collection<TimeWindow> assignWindows(
        Object element, long timestamp, WindowAssignerContext context) {
        // Long.MIN_VALUE is currently assigned when no timestamp is present
    if (timestamp > Long.MIN_VALUE) {
        if (staggerOffset == null) {
            staggerOffset =
                    windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
        }
        long start =
                TimeWindow.getWindowStartWithOffset(
                        timestamp, (globalOffset + staggerOffset) % size, size);
        // 返回构建好起止时间的TimeWindow
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException(
                "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
                        + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
                        + "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

设置窗口触发器Trigger

@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
    return EventTimeTrigger.create();
}

WindowAssigner与其主要实现类的关系如下:

这些类的含义分别如下

  • GlobalWindows:将所有元素分配进同一个窗口的全局窗口分配器。
  • SlidingEventTimeWindows:基于事件时间的滑动窗口分配器。
  • SlidingProcessingTimeWindows:基于处理时间的滑动窗口分配器。
  • TumblingEventTimeWindows:基于事件时间的滚动窗口分配器。
  • TumblingProcessingTimeWindows:基于处理时间的滚动窗口分配器。
  • EventTimeSessionWindows:基于事件时间的会话窗口分配器。
  • ProcessingTimeSessionWindows:基于处理时间的会话窗口分配器。

Trigger

Trigger表示窗口触发器。它是一个抽象类,主要定义了下面3个方法用于确定窗口何时触发计算:

// 每个元素到来时触发
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 处理时间的定时器触发时
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 事件时间的定时器触发时调用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

这3个方法的返回结果为TriggerResult对象。TriggerResult是一个枚举类,包含两个boolean类型的字段fire和purge,分别表示窗口是否触发计算和窗口内的元素是否需要清空。

CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);

TriggerResult(boolean fire, boolean purge) {
    this.purge = purge;
    this.fire = fire;
}

窗口触发器的实现由用户根据业务需求自定义。Flink默认基于事件时间的触发器为EventTimeTrigger,其三个方法处理如下

@Override
public TriggerResult onElement(
        Object element, long timestamp, TimeWindow window, TriggerContext ctx)
        throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // 如果水印已经超过窗口,则立即触发
        return TriggerResult.FIRE;
    } else {
        // 注册事件时间定时器
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}

/*
 * 处理时间,窗口不触发计算也不清空内部元素。
 */
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
        throws Exception {
    return TriggerResult.CONTINUE;
}

Trigger与其主要实现类的继承关系

这些类的含义如下

  • CountTrigger:元素数达到设置的个数时触发计算的触发器。
  • DeltaTrigger:基于DeltaFunction和设置的阈值触发计算的触发器。
  • EventTimeTrigger:基于事件时间的触发器。
  • ProcessingTimeTrigger:基于处理时间的触发器。
  • PurgingTrigger:可包装其他触发器的清空触发器。
  • ContinuousEventTimeTrigger:基于事件时间并按照一定的时间间隔连续触发计算的触发器。
  • ContinuousProcessingTimeTrigger:基于处理时间并按照一定的时间间隔连续触发计算的触发器。

windowOperator

WindowedStream的构造函数中,会生成WindowOperatorBuilder,该类可以返回WindowOperator,这两个类负责窗口分配器、窗口触发器和窗口剔除器这些组件在运行时的协同工作。

对于WindowOperator,除了窗口分配器和窗口触发器的相关字段,可以先了解下面两个字段。

// StateDescriptor类型,表示窗口状态描述符。
private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;

// 表示窗口的状态,窗口内的元素都在其中维护。
private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;

窗口中的元素并没有保存在Window对象中,而是维护在windowState中。windowStateDescriptor则是创建windowState所需用到的描述符。

当有元素到来时,会调用WindowOperator的processElement()方法:

public void processElement(StreamRecord<IN> element) throws Exception {
    // 分配窗口
    final Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(), element.getTimestamp(), windowAssignerContext);
            ...
        if (windowAssigner instanceof MergingWindowAssigner) { // Session Window的情况
            ...
        } else {
            for (W window: elementWindows) { // 非Session Window的情况
                ...
                // 将Window对象设置为namespace并添加元素到windowState中
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
                triggerContext.key = key;
                triggerContext.window = window;
                // 获取TriggerResult,确定接下来是否需要触发计算或清空窗口
                TriggerResult triggerResult = triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    // 触发计算
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    // 清空窗口
                    windowState.clear();
                }
                ...
            }
        }
    ...
}

在处理时间或事件时间的定时器触发时,会调用WindowOperator的onProcessingTime()方法或onEventTime()方法,其中的逻辑与onElement()方法的大同小异。

Watermarks

水位线(watermark)是选用事件时间来进行数据处理时特有的概念。它的本质就是时间戳,从上游流向下游,表示系统认为数据中的事件时间在该时间戳之前的数据都已到达。

Flink中,Watermark类表示水位。

/** Creates a new watermark with the given timestamp in milliseconds. */
public Watermark(long timestamp) {
    this.timestamp = timestamp;
}

watermark的生成有两种方式,这里不赘述,主要讲述下基于配置的策略生成watermark的方式。如下的代码是比较常见的配置:

// 分配事件时间与水印
.assignTimestampsAndWatermarks(
        // forBoundedOutOfOrderness 会根据事件的时间戳和允许的最大乱序时间生成水印。
        // Duration 设置了最大乱序时间为1秒。这意味着 Flink 将允许在这1秒的时间范围内的事件不按照事件时间的顺序到达,这个时间段内的事件会被认为是"有序的"。
        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
        // 设置事件时间分配器,从Event对象中提取时间戳作为事件时间
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

在Flink内部,会根据配置的策略调用BoundedOutOfOrdernessWatermarks生成watermark。该类的代码如下:

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每条数据都会更新最大值
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发送 watermark 逻辑
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

onEvent决定每次事件都会取得最大的事件时间更新;onPeriodicEmit则是周期性的更新并传递到下游。

AbstractStreamOperator

WatermarkGenerator接口的调用是在AbstractStreamOperator抽象类的子类TimestampsAndWatermarksOperator中。其生命周期open函数与每个数据到来的处理函数processElement,如下:

@Override
public void open() throws Exception {
    super.open();

    timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
    watermarkGenerator =
            emitProgressiveWatermarks
                    ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                    : new NoWatermarksGenerator<>();

    wmOutput = new WatermarkEmitter(output);

    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
    if (watermarkInterval > 0 && emitProgressiveWatermarks) {
        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
}

@Override
public void processElement(final StreamRecord<T> element) throws Exception {
    final T event = element.getValue();
    final long previousTimestamp =
            element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
    // 从分配器中提取事件时间戳
    final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

    element.setTimestamp(newTimestamp);
    output.collect(element);
    // 调用水印生成器
    watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}

从方法的入参可以看出来 flink 算子间的数据流动是 StreamRecord 对象。它对数据的处理逻辑是什么都不做直接向下游发送,然后调用 onEvent 记录最大时间戳,也就是说:flink 是先发送数据再生成 watermark,watermark 永远在生成它的数据之后。

总结

上面的一系列相关代码,只是冰山一角,暂时只是把关键涉及到的部分捋了一下。最后画个图,展示其大致思路。

参考:

Flink Watermark 源码解析文章来源地址https://www.toymoban.com/news/detail-746477.html

到了这里,关于聊聊Flink必知必会(五)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MySql必知必会

    Buffer Pool基本概念 Buffer Pool:缓冲池,简称BP。其作用是用来缓存表数据与索引数据,减少磁盘IO操作,提升效率。 Buffer Pool由 缓存数据页(Page) 和 对缓存数据页进行描述的 控制块 组成, 控制块中存储着对应缓存页的所属的 表空间、数据页的编号、以及对应缓存页在Buffer Poo

    2024年01月22日
    浏览(65)
  • MySQL必知必会(初级篇)

    数据库 (DataBase,DB),是统一管理的、长期存储在计算机内的、有组织的相关数据的集合。特点是数据见联系密切、冗余度小、独立性高、易扩展,并且可以为各类用户共享。 MySQL :是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的

    2023年04月08日
    浏览(50)
  • 《SQL 必知必会》全解析

    不要哀求,学会争取。若是如此,终有所获。 原文:https://mp.weixin.qq.com/s/zbOqyAtsWsocarsFIGdGgw 你是否还在烦恼 SQL 该从何学起,或者学了 SQL 想找个地方练练手?好巧不巧,最近在工作之余登上牛客,发现了牛客不知道啥时候上线了SQL 必知必会的练习题。 《SQL 必知必会》作为麻

    2024年02月08日
    浏览(49)
  • ChatGPT入门必知必会

    更多文章欢迎关注公众号: stackoveriow 2023年是真正意义上的AI之年,因为ChatGPT 2007年,iPhone开启了智能手机时代, 2023年,我们迎来了人工智能时代,我们正处于历史的大转折点上,这也许是启蒙运动级别的思想和社会转折,工业革命级别的生产和生活转折 。继22年12月份从GP

    2023年04月18日
    浏览(123)
  • 抽象语法树AST必知必会

    打开前端项目中的 package.json,会发现众多工具已经占据了我们开发日常的各个角落,例如 JavaScript 转译、CSS 预处理、代码压缩、ESLint、Prettier 等。这些工具模块大都不会交付到生产环境中,但它们的存在于我们的开发而言是不可或缺的。 有没有想过这些工具的功能是如何实

    2024年02月16日
    浏览(50)
  • 《MySQL 必知必会》课程笔记(三)

    创建和修改数据表,是数据存储过程中的重要⼀环。 我们不仅需要把表创建出来,还需要正确地设置限定条件,这样才能确保数据的一致性和完整性。 同时,表中的数据会随着业务需求的变化而变化,添加和修改相应的字段也是常见的操作。 首先,我们要知道 MySQL 创建表的

    2024年02月03日
    浏览(45)
  • 必知必会Java命令-jps

    你好,我是阿光。 最近想着把工作中使用过的java命令都梳理一下,方便日后查阅。虽然这类文章很多,但自己梳理总结后,还是会有一些新的收获。这也是这篇笔记的由来。 今天先聊聊 jps 命令。 jps 命令是JDK提供的一个工具,用于查看目标系统上的Java进程基本信息(进程

    2024年02月05日
    浏览(43)
  • 【数据库】索引必知必会

    数据库中索引(Index)是一种帮助快速查找数据的数据结构,可以把它理解为书的目录,通过索引能够快速找到数据所在位置。 使用索引可以加快数据查找的效率,这是创建索引的最主要原因。 场景的索引数据结构有:Hash表(通过hash算法快速定位数据,但不适合范围查询,

    2023年04月20日
    浏览(54)
  • 缓存中间件Redis必知必会

    作者: 逍遥Sean 简介:一个主修Java的Web网站游戏服务器后端开发者 主页:https://blog.csdn.net/Ureliable 觉得博主文章不错的话,可以三连支持一下~ 如有需要我的支持,请私信或评论留言! 前言: 本文是对redis的基本用法操作的整理。 如果需要在linux环境中搭建一个redis服务参考

    2024年02月11日
    浏览(50)
  • Python必知必会 os 模块详解

    ❤️ 作者简介 :大家好我是小鱼干儿♛是一个热爱编程、热爱算法的大三学生,蓝桥杯国赛二等奖获得者 🐟 个人主页 :https://blog.csdn.net/qq_52007481 ⭐ 个人社区 :【小鱼干爱编程】 🔥 算法专栏 :算法竞赛进阶指南 💯 刷题网站 :市面上的刷题网站有很多如何选择一个适

    2024年02月03日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包