深入理解Flink IntervalJoin源码

这篇具有很好参考价值的文章主要介绍了深入理解Flink IntervalJoin源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

IntervalJoin基于connect实现,期间会生成对应的IntervalJoinOperator。

@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(
    ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
    TypeInformation<OUT> outputType) {
    Preconditions.checkNotNull(processJoinFunction);
    Preconditions.checkNotNull(outputType);

    // 检查用户自定义Function
    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

    // 构建IntervalJoin对应的IntervalJoinOperator
    final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
        new IntervalJoinOperator<>(
        lowerBound,
        upperBound,
        lowerBoundInclusive,
        upperBoundInclusive,
        left.getType().createSerializer(left.getExecutionConfig()),
        right.getType().createSerializer(right.getExecutionConfig()),
        cleanedUdf
    );

    // (基于connect实现)使用给定的自定义Function,对每个元素进行连接操作
    return left
        .connect(right)
        // 根据k1、k2,为s1、s2分配k,实际就是构建ConnectedStreams,以便后续构建IntervalJoinOperator对应的Transformation
        .keyBy(keySelector1, keySelector2)
        // 构建IntervalJoinOperator对应的TwoInputTransformation
        .transform("Interval Join", outputType, operator);
}

并且会根据给定的自定义Function构建出对应的TwoInputTransformation,以便能够参与Transformation树的构建。

/**
 * 创建StreamOperator对应的Transformation,以便能参与Transformation树的构建
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String functionName,
                                                   TypeInformation<R> outTypeInfo,
                                                   TwoInputStreamOperator<IN1, IN2, R> operator) {

    inputStream1.getType();
    inputStream2.getType();

    // 创建IntervalJoinOperator对应的TwoInputTransformation
    TwoInputTransformation<IN1, IN2, R> transform = new TwoInputTransformation<>(
        inputStream1.getTransformation(),
        inputStream2.getTransformation(),
        functionName,
        operator,
        outTypeInfo,
        environment.getParallelism());

    if (inputStream1 instanceof KeyedStream && inputStream2 instanceof KeyedStream) {
        KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
        KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2;

        TypeInformation<?> keyType1 = keyedInput1.getKeyType();
        TypeInformation<?> keyType2 = keyedInput2.getKeyType();
        if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) {
            throw new UnsupportedOperationException("Key types if input KeyedStreams " +
                                                    "don't match: " + keyType1 + " and " + keyType2 + ".");
        }

        transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());
        transform.setStateKeyType(keyType1);
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, transform);

    // 将IntervalJoinOperator对应的TwoInputTransformation,添加到Transformation树上
    getExecutionEnvironment().addOperator(transform);

    return returnStream;
}

作为ConnectedStreams,一旦left or right流中的StreamRecord抵达,就会被及时处理:

@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
   /**处理left*/
   processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}

@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
    /**处理right*/
    processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}

两者的处理逻辑是相同的:

/**
 * 处理Left和Right中的数据
 */
@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(
    final StreamRecord<THIS> record,
    final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
    final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
    final long relativeLowerBound,
    final long relativeUpperBound,
    // 当前Join上的数据是否为left
    final boolean isLeft) throws Exception {

    // 当前left or right的StreamRecord
    final THIS ourValue = record.getValue();
    // 当前left or right的StreamRecord中的时间戳
    final long ourTimestamp = record.getTimestamp();

    if (ourTimestamp == Long.MIN_VALUE) {
        throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                                 "interval stream joins need to have timestamps meaningful timestamps.");
    }

    // 是否迟到:当前StreamRecord中的时间戳是否小于当前Watermark
    if (isLate(ourTimestamp)) {
        return;
    }

    // 将当前StreamRecord写入到它所对应的“己方MapState”中(left归left,right归right)
    addToBuffer(ourBuffer, ourValue, ourTimestamp);

    /**
      * 遍历当前StreamRecord的“对方MapState”,判断哪个StreamRecord被Join上了
      */
    for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
        // “对方MapState”中的Key,即时间戳
        final long timestamp  = bucket.getKey();

        // 如果遍历到的MapState的这个元素的时间戳不在(以当前StreamRecord的时间戳为基准的)Join的范围内,
        // 说明没Join上,那就跳过本次循环。这是判断哪个StreamRecord是否Join上的核心!
        if (timestamp < ourTimestamp + relativeLowerBound ||
            timestamp > ourTimestamp + relativeUpperBound) {
            continue;
        }

        // 反之,说明已经Join上了,那就取出这个元素的Value,即时间戳所对应的List<BufferEntry<T1>>
        for (BufferEntry<OTHER> entry: bucket.getValue()) {
            // 将Join上的left和right分发下游(回调用户自定义函数中的processElement()方法)
            if (isLeft) {
                collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
            } else {
                collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
            }
        }
    }

    // 经历双层for循环并分发下游后,计算清理时间(当前StreamRecord的时间戳+上界值)
    long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
    // 注册Timer来清理保存在MapState中的过期数据
    if (isLeft) {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
    } else {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
    }
}

先取出当前StreamRecord中的Timestamp检查它是否已经迟到了,判断依据为:当前StreamRecord中的Timestamp是否小于当前Watermark。

/**
 * 判断当前StreamRecord是否迟到:当前StreamRecord中的时间戳是否小于当前Watermark
 */
private boolean isLate(long timestamp) {
    // 获取当前的Watermark
    long currentWatermark = internalTimerService.currentWatermark();
    // 迟到判定条件
    return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}

接着将当前StreamRecord写入到对应的MapState中。需要注意的是,left和right都有各自的MapState,这个MapState将Timestamp作为Key,将List集合作为Value(考虑到同一时刻可能会有多条数据)

/**
 * 将当前StreamRecord写入到它所对应的MapState中(left归left,right归right)
 */
private static <T> void addToBuffer(
    final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
    final T value,
    final long timestamp) throws Exception {
    // 先拿着时间戳作为key去MapState中取
    List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
    if (elemsInBucket == null) {
        elemsInBucket = new ArrayList<>();
    }
    // 将StreamRecord包装成BufferEntry(默认未被Join上),add到List集合中
    elemsInBucket.add(new BufferEntry<>(value, false));
    // 将List集合put到MapState中(时间戳作为Key)
    buffer.put(timestamp, elemsInBucket);
}

接着会经历嵌套for循环,判断哪些StreamRecord是满足Join条件的:以当前StreamRecord的Timestamp和指定的上、下界组成时间过滤条件,对当前StreamRecord的“对方MapState”内的每个Timestamp(作为Key)进行比对。

/**
 * 遍历当前StreamRecord的“对方MapState”,判断哪个StreamRecord被Join上了
 */
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
    // “对方MapState”中的Key,即时间戳
    final long timestamp  = bucket.getKey();

    // 如果遍历到的MapState的这个元素的时间戳不在(以当前StreamRecord的时间戳为基准的)Join的范围内,
    // 说明没Join上,那就跳过本次循环。这是判断哪个StreamRecord是否Join上的核心!
    if (timestamp < ourTimestamp + relativeLowerBound ||
        timestamp > ourTimestamp + relativeUpperBound) {
        continue;
    }

    // 反之,说明已经Join上了,那就取出这个元素的Value,即时间戳所对应的List<BufferEntry<T1>>
    for (BufferEntry<OTHER> entry: bucket.getValue()) {
        // 将Join上的left和right分发下游(回调用户自定义函数中的processElement()方法)
        if (isLeft) {
            collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
        } else {
            collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
        }
    }
}

一旦某个Key符合时间过滤条件,那就将它所对应的List集合(作为Value)取出来,逐条将其发送给下游,本质就是将其交给自定义Function处理

/**
 * 将满足IntervalJoin条件的StreamRecord发送给下游,本质就是将其交给自定义Function处理
 */
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
    final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

    collector.setAbsoluteTimestamp(resultTimestamp);
    context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

    // 将Join上的StreamRecord交给自定义Function,执行开发者的处理逻辑
    userFunction.processElement(left, right, context, collector);
}

整个过滤筛选过程,也是IntervalJoin的核心所在!

最后,会计算保存在MapState中的StreamRecord的过期清理时间,因为StreamRecord不能一直被保存。本质就是基于InternalTimerService注册Timer,触发时间为:当前StreamRecord的Timestamp + 给定的上界值。

// 经历双层for循环并分发下游后,计算清理时间(当前StreamRecord的时间戳+上界值)
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
// 注册Timer来清理保存在MapState中的过期数据
if (isLeft) {
    internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
    internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}

由于IntervalJoinOperator实现了Triggerable接口,因此一旦注册的Timer被触发,就会将对应MapState中对应的Timestamp进行remove文章来源地址https://www.toymoban.com/news/detail-664464.html

/**
 * 基于InternalTimerService注册的Timer,会定时对MapState执行clean操作
 */
@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {

    long timerTimestamp = timer.getTimestamp();
    String namespace = timer.getNamespace();

    logger.trace("onEventTime @ {}", timerTimestamp);

    switch (namespace) {
        case CLEANUP_NAMESPACE_LEFT: {
            long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
            logger.trace("Removing from left buffer @ {}", timestamp);
            // clean left
            leftBuffer.remove(timestamp);
            break;
        }
        case CLEANUP_NAMESPACE_RIGHT: {
            long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
            logger.trace("Removing from right buffer @ {}", timestamp);
            // clean right
            rightBuffer.remove(timestamp);
            break;
        }
        default:
            throw new RuntimeException("Invalid namespace " + namespace);
    }
}

到了这里,关于深入理解Flink IntervalJoin源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Flink(一)Flink 架构设计原理

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(44)
  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(46)
  • 深入理解 Flink(七)Flink Slot 管理详解

    JobMaster 中封装了一个 DefaultScheduler,在 DefaultScheduler.startSchedulingInternal() 方法中生成 ExecutionGraph 以执行调度。 资源调度的大体流程如下: Register:当 TaskExecutor 启动之后,会向 ResourceManager 注册自己(TaskExecutor)和自己内部的 Slot(TaskManagerSlot)。 Status Report:TaskExecutor 启动之

    2024年01月21日
    浏览(41)
  • 深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月16日
    浏览(43)
  • 深入理解Flink Mailbox线程模型

    Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式,可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Timer的相关操作(Runnable任务),会以Mail的形式保存到Mailbox内的阻塞队列中。StreamTask在invoke阶段的runMailboxLoop时期,就会轮询Mailbox来处理队列中保存的M

    2024年02月12日
    浏览(42)
  • 深入理解 Flink(八)Flink Task 部署初始化和启动详解

    核心入口: 部署 Task 链条:JobMaster -- DefaultScheduler -- SchedulingStrategy -- ExecutionVertex -- Execution -- RPC请求 -- TaskExecutor JobMaster 向 TaskExecutor 发送 submitTask() 的 RPC 请求,用来部署 StreamTask 运行。TaskExecutor 接收到 JobMaster 的部署 Task 运行的 RPC 请求的时候,就封装了一个 Task 抽象,然

    2024年01月17日
    浏览(79)
  • Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

    Flink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。 在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。 窗口:对于窗口操作,可以将TTL应用于

    2024年02月03日
    浏览(57)
  • [AIGC] 深入理解Flink中的窗口、水位线和定时器

    Apache Flink是一种流处理和批处理的混合引擎,它提供了一套丰富的APIs,以满足不同的数据处理需求。在本文中,我们主要讨论Flink中的三个核心机制:窗口(Windows)、水位线(Watermarks)和定时器(Timers)。 在流处理应用中,一种常见的需求是计算某个时间范围内的数据,这

    2024年03月27日
    浏览(59)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(41)
  • 【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理

    Apache Flink 是一个流式处理框架,被广泛应用于大数据领域的实时数据处理和分析任务中。在 Flink 中,FileSource 是一个重要的组件,用于从文件系统中读取数据并将其转换为 Flink 的数据流。本文将深入探讨 FileSource 的工作原理、用法以及与其他数据源的比较。 FileSource 是 Fli

    2024年02月21日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包