深入理解Flink Mailbox线程模型

这篇具有很好参考价值的文章主要介绍了深入理解Flink Mailbox线程模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

整体设计

Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式,可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Timer的相关操作(Runnable任务),会以Mail的形式保存到Mailbox内的阻塞队列中。StreamTask在invoke阶段的runMailboxLoop时期,就会轮询Mailbox来处理队列中保存的Mail,Mail处理完毕后才会对DataStream上的数据元素执行处理逻辑。

MailboxProcessor的能力就是负责拉取、处理Mail,以及执行MailboxDefaultAction(默认动作,即processInput()方法中对DataStream上的普通消息的处理逻辑,包括:处理Event、barrier、Watermark等)

/**
 * 开始轮询Mailbox内的Mail,Checkpoint和ProcessingTime Timer的触发事件会以Runnable的形式(作为Mail)添加到Mailbox的队列中,等待“Mailbox线程”去处理
 */
public void runMailboxLoop() throws Exception {

    // 获取最新的TaskMailbox:主要用于存储提交的Mail,并提供获取接口。
    // TaskMailbox有2个队列:
    //        1.queue:阻塞队列,通过ReentrantLock控制队列中的读写操作
    //        2.batch:非阻塞队列,调用createBatch()方法会将queue中的Mail转存到batch中,这样读操作就能通过tryTakeFromBatch()方法从batch队列中批量获取Mail,且只能被Mailbox线程消费
    final TaskMailbox localMailbox = mailbox;

    // 检查当前线程是否为Mailbox线程,即StreamTask运行时所在的主线程
    Preconditions.checkState(
        localMailbox.isMailboxThread(),
        "Method must be executed by declared mailbox thread!");

    // 确认Mailbox的状态:必须为OPEN
    assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

    // 创建MailboxController实例:可以控制Mailbox的循环、临时暂停和恢复MailboxDefaultAction(默认动作)
    final MailboxController defaultActionContext = new MailboxController(this);

    /**
     * 核心:事件循环
     * processMail()方法会检测Mailbox中是否还有Mail需要处理,新Mail会(在ReentrantLock的保护下)被添加到queue队列并转存到batch队列中。
     * MailboxProcessor处理完batch队列中的全部Mail后(执行作为Mail的Runnable#run()方法),才会进入到while循环内,执行MailboxDefaultAction的默认动作,
     * 即调用StreamTask#processInput()方法,对读取到的数据(Event、Barrier、Watermark等)进行处理
     */
    while (processMail(localMailbox)) {
        mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
    }
}

可以看出,对Mail和MailboxDefaultAction的处理,是由唯一的Mailbox线程负责的。

processMail

在while轮询时,首先会processMail

/**
 * 处理Mailbox中的Mail:Checkpoint、ProcessingTime Timer的触发事件,会以Runnable的形式作为Mail保存在Mailbox的queue队列中,
 * 并在ReentrantLock的保护下,将queue队列中的新Mail转移到batch队列中。MailboxProcessor会根据queue队列、batch队列内的Mail情况,
 * 决定处理Mail or processInput。只有当TaskMailbox内的所有Mail全都处理完毕后,MailboxProcessor才会去processInput()
 */
private boolean processMail(TaskMailbox mailbox) throws Exception {

    /**
     * 新Mail写入queue队列,TaskMailbox会将queue队列中的新Mail转移到batch队列中,MailboxProcessor会根据queue队列、batch队列内的Mail情况,
     * 判断执行Mail的run() or processInput()。只有当TaskMailbox内的所有Mail全部处理完成后,MailboxProcessor才会去processInput()。
     */
    if (!mailbox.createBatch()) {
        return true;
    }

    Optional<Mail> maybeMail;
    /**
     * 能走到这,说明queue队列中的Mail已被全部转移至batch队列。现在要从batch队列中获取到Mail并执行(它作为Runnable的run()方法),
     * 直到batch队列中的所有Mail全都处理完毕
      */
    while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
        maybeMail.get().run();
    }

    /**如果默认操作处于Unavailable状态,那就先阻塞住,直到它重新回归available状态*/
    while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {
        mailbox.take(MIN_PRIORITY).run();
    }

    // 返回Mailbox是否还在Loop
    return isMailboxLoopRunning();
}

很核心的一个点就是Mailbox要去createBatch,TaskMailboxImpl提供了具体的实现逻辑。Mailbox引入了2个队列,新Mail被add到Mailbox内的queue队列中(此过程受ReentrantLock保护)。同时为了减少读取queue队列时的同步开销,Mailbox还构建了一个batch队列专门用来后续消费(避免加锁操作)。

/**
 * 对Deque<Mail>队列的读写,通过ReentrantLock加以保护
 */
private final ReentrantLock lock = new ReentrantLock();

/**
 * Internal queue of mails.
 * 使用Deque(内部队列)保存所有的Mail
 */
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();
/**
 * 为了减少读取queue队列所造成的同步开销,TaskMailbox会创建一个batch队列,queue队列中的Mail会被转移到batch队列中,
 * 有效避免了后续消费时的加锁操作
 */
private final Deque<Mail> batch = new ArrayDeque<>();


@Override
public boolean createBatch() {
    checkIsMailboxThread();
    /**
     * 如果queue队列中没有新Mail,那就要看batch队列是否为空。
     * 1.如果batch也是空的(Mailbox里已经没有任何Mail了,需要去processInput()了),那processMail()也会return true,
     * MailboxProcessor就会进入到while循环内部,执行processInput()来处理DataStream上的数据;
     * 2.如果batch不空,说明MailboxProcessor还需要继续processMail(),即取出Mail执行它(作为Runnable)的run()方法;
     * 由此可见,Mailbox中的batch队列中的Mail最终一定会被Mailbox线程消耗殆尽(轮询、处理),然后才会去processInput()
     */
    if (!hasNewMail) { // 只要queue队列里还有Mail,hasNewMail就为true
        return !batch.isEmpty();
    }
    /**能走到这说明queue队列中仍有新Mail,接下来需要将它的新Mail向batch队列转移,该过程受ReentrantLock保护*/
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        Mail mail;
        /**每次循环都将queue队列中的First Mail,转移到batch队列中,直至queue队列被消耗殆尽。此时一定return true*/
        while ((mail = queue.pollFirst()) != null) {
            batch.addLast(mail);
        }
        // 此时queue队列内的所有Mail都被转移到batch队列中了,queue中没有新Mail了
        hasNewMail = false;
        // 此时根据batch队列是否为空,MailboxProcessor会判断执行Mail的run() or processInput()
        return !batch.isEmpty();
    } finally {
        // 最终释放锁
        lock.unlock();
    }
}

如果Mailbox内的queue队列中仍有新Mail,那就在ReentrantLock的加持下将queue内的Mail全都转移到batch队列中;如果Mailbox内的queue队列中没有新Mail,那就看batch队列的情况了。决断权交给外层的MailboxProcessor,总的来看:

  • 如果batch队列中有Mail,MailboxProcessor会从Mailbox内的batch队列中逐个pollFirst,然后执行(它作为Runnable#run()方法),直到batch队列中的所有Mail全都被“消耗殆尽”为止
  • 如果batch队列中没有Mail,MailboxProcessor此时就没有Mail可处理了,那就直接processInput

1.Checkpoint Tigger

对Checkpoint的触发,是通过MailboxExecutor向Mailbox提交Mail的

/**
 * 触发执行StreamTask中的Checkpoint操作:异步的通过MailboxExecutor,将“执行Checkpoint”的请求封装成Mail后,
 * 提交到TaskMailbox中,最终由MailboxProcessor来处理
 */
@Override
public Future<Boolean> triggerCheckpointAsync(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      boolean advanceToEndOfEventTime) {

   // 通过MailboxExecutor,将“触发执行Checkpoint”的具体逻辑封装成Mail,提交到Mailbox中,后期会被MailboxProcessor执行
   return mailboxProcessor.getMainMailboxExecutor().submit(
         // 触发Checkpoint的具体逻辑
         () -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
         "checkpoint %s with %s",
      checkpointMetaData,
      checkpointOptions);
}

triggerCheckpoint操作会被封装成Mail,添加到Mailbox中等待被处理。

@Override
public void execute(
    @Nonnull final RunnableWithException command,
    final String descriptionFormat,
    final Object... descriptionArgs) {
    try {
        mailbox.put(new Mail(command, priority, actionExecutor, descriptionFormat, descriptionArgs));
    } catch (IllegalStateException mbex) {
        throw new RejectedExecutionException(mbex);
    }
}

当然,Checkpoint的完成操作,也是同样的套路。

2.ProcessingTime Timer Trigger

/**
 * 借助Mailbox线程模型,由MailboxExecutor负责将"ProcessingTime Timer触发的消息"封装成Mail提交到TaskMailbox中,后续由MailboxProcessor处理
 */
public ProcessingTimeService getProcessingTimeService(int operatorIndex) {
    Preconditions.checkState(timerService != null, "The timer service has not been initialized.");
    MailboxExecutor mailboxExecutor = mailboxProcessor.getMailboxExecutor(operatorIndex);
    // 通过MailboxExecutor将Mail提交到Mailbox中等待处理
    return new ProcessingTimeServiceImpl(timerService, callback -> deferCallbackToMailbox(mailboxExecutor, callback));
}


private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {
    return timestamp -> {
        mailboxExecutor.execute(
            () -> invokeProcessingTimeCallback(callback, timestamp),
            "Timer callback for %s @ %d",
            callback,
            timestamp);
    };
}

processInput

StreamInputProcessor会对输入的数据进行处理、输出,包含:StreamTaskInput + OperatorChain + DataOutput。每次processInput都相当于是在处理一个有界流(外层MailboxProcessor在不断地的轮询),处理完DataStream上的StreamRecord后,会返回InputStatus的枚举值,根据InputStatus值来决定下一步该“何去何从”。

/**
 * StreamTask的执行逻辑:处理输入的数据,返回InputStatus状态,并根据InputStatus决定是否需要结束当前Task。
 * 该方法会通过MailboxProcessor调度、执行(作为MailboxProcessor的默认动作),底层调用StreamInputProcessor#processInput()方法
 */
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
    /**
     * 核心:借助StreamInputProcessor完成数据的读取,并交给算子处理,处理完毕后会返回InputStatus。
     * 每次触发,相当于处理一个有界流,在外层Mailbox拉取Mail才是while循环无限拉取
     */
    InputStatus status = inputProcessor.processInput();
    /**
     * case 1:上游如果还有数据 && RecordWriter是可用的,立即返回。意为:继续处理!
     */
    if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
        return;
    }
    /**
     * case 2:当状态为END_OF_INPUT,说明本批次的有界流数据已经处理完毕,
     * 通过MailboxCollector来告诉Mailbox
     */
    if (status == InputStatus.END_OF_INPUT) {
        controller.allActionsCompleted();
        return;
    }
    /**
     * case 3:当前有界流中没有数据,但未来可能会有。此时处理线程会被挂起:直到有新的可用数据到来 && RecordWriter可用
     * 此时会先临时暂停对MailboxDefaultAction的处理,等啥时候又有新数据了,再重新恢复MailboxDefaultAction的处理。
     */
    CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
    // 通过MailboxCollector让Mailbox线程暂时停止对MailboxDefaultAction的处理
    MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
    // 等啥时候又有了input、output,RecordWriter也变得可用了以后,再重新继续执行默认操作
    jointFuture.thenRun(suspendedDefaultAction::resume);
}

MailboxController是MailboxDefaultAction和Mailbox之间交互的桥梁,在StreamTask处理DataStream元素的过程中,会利用MailboxController将处理状态及时通知给Mailbox。如果这批有界流处理完毕,就会通过MailboxController通知Mailbox(本质就是向Mailbox发送一个Mail),进行下一轮的处理。

private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {
    mailbox.putFirst(new Mail(
        mail,
        Integer.MAX_VALUE /*not used with putFirst*/,
        descriptionFormat,
        descriptionArgs));
}

兼容SourceStreamTask

作为DataStream Source是专门用来生产无界流数据的,并不能穿插兼顾Mailbox内Mail的检测。如果仅有一个线程生产无界流数据的话,那将永远无法检测Mailbox内的Mail。作为StreamTask的子类,SourceStreamTask会额外启动另一个独立的LegacySourceFunctionThread线程来执行SourceFunction中的循环(生产无界流),Mailbox线程(主线程)依然负责处理Mail和默认操作。

/**
 * 专门为Source源生产数据的线程
 */
private class LegacySourceFunctionThread extends Thread {

    private final CompletableFuture<Void> completionFuture;

    LegacySourceFunctionThread() {
        this.completionFuture = new CompletableFuture<>();
    }

    @Override
    public void run() {
        try {
            // CheckpointLock保证线程安全
            headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);
            completionFuture.complete(null);
        } catch (Throwable t) {
            // Note, t can be also an InterruptedException
            completionFuture.completeExceptionally(t);
        }
    }

    public void setTaskDescription(final String taskDescription) {
        setName("Legacy Source Thread - " + taskDescription);
    }

    CompletableFuture<Void> getCompletionFuture() {
        return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;
    }
}

负责为Source生产无界流数据的LegacySourceFunctionThread线程启动后,不管是启动成功 or 出现异常,都会封装对应的Mail并发送给Mailbox,而Mailbox线程的processMail一直在等待处理Mail。

/**
 * SourceStreamTask中,一个Thread负责专门生产无界流,另一个MailBox Thread处理Checkpoint、ProcessingTime Timer等事件Mail
 */
@Override
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {

    /**
     * 通过MailboxDefaultAction.Controller告诉Mailbox:让MailboxThread先暂停处理MailboxDefaultAction。
     * TaskMailbox收到该消息后,就会在processMail()中一直等待并处理Mail(在MailboxThread中会一直处理Mail)
     */
    controller.suspendDefaultAction();

    /**启动LegacySourceFunctionThread线程:专门生产Source无界流数据的,和MailboxThread线程一起运行*/
    sourceThread.setTaskDescription(getName());
    sourceThread.start();
    /**LegacySourceFunctionThread线程启动后,会通知Mailbox,Mailbox会在processMail()中一直等待并处理mail(不会返回,即Mailbox线程会一直处理mail)*/
    sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {
        /**LegacySourceFunctionThread线程启动过程中发生的任何异常、以及启动成功,都会以Mail的形式发送给Mailbox*/
        if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {
            mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));
        } else if (!isFinished && sourceThreadThrowable != null) {
            mailboxProcessor.reportThrowable(sourceThreadThrowable);
        } else {
            mailboxProcessor.allActionsCompleted();
        }
    });
}

Mailbox主线程和LegacySourceFunctionThread线程线程都在运行,通过CheckpointLock锁来保证线程安全。文章来源地址https://www.toymoban.com/news/detail-650376.html

到了这里,关于深入理解Flink Mailbox线程模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Flink(五)Flink Standalone 集群启动源码剖析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(43)
  • 深入理解Flink IntervalJoin源码

    IntervalJoin基于connect实现,期间会生成对应的IntervalJoinOperator。 并且会根据给定的自定义Function构建出对应的TwoInputTransformation,以便能够参与Transformation树的构建。 作为ConnectedStreams,一旦left or right流中的StreamRecord抵达,就会被及时处理: 两者的处理逻辑是相同的: 先取出当

    2024年02月12日
    浏览(36)
  • 深入理解 Flink(三)Flink 内核基础设施源码级原理详解

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(35)
  • 深入理解 Flink(八)Flink Task 部署初始化和启动详解

    核心入口: 部署 Task 链条:JobMaster -- DefaultScheduler -- SchedulingStrategy -- ExecutionVertex -- Execution -- RPC请求 -- TaskExecutor JobMaster 向 TaskExecutor 发送 submitTask() 的 RPC 请求,用来部署 StreamTask 运行。TaskExecutor 接收到 JobMaster 的部署 Task 运行的 RPC 请求的时候,就封装了一个 Task 抽象,然

    2024年01月17日
    浏览(71)
  • Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

    Flink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。 在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。 窗口:对于窗口操作,可以将TTL应用于

    2024年02月03日
    浏览(50)
  • Flink 深入理解任务执行计划,即Graph生成过程(源码解读)

    我们先看一下,Flink 是如何描述作业的执行计划的。以这个 DataStream 作业为例,Flink 会基于它先生成一个 StreamGraph。这是一个有向无环图,图中的节点对应着计算逻辑,图中的边则对应着数据的分发方式。 Flink 会根据节点的并行度以及他们之间的连边方式,把一些计算节点进

    2024年02月22日
    浏览(41)
  • [AIGC] 深入理解Flink中的窗口、水位线和定时器

    Apache Flink是一种流处理和批处理的混合引擎,它提供了一套丰富的APIs,以满足不同的数据处理需求。在本文中,我们主要讨论Flink中的三个核心机制:窗口(Windows)、水位线(Watermarks)和定时器(Timers)。 在流处理应用中,一种常见的需求是计算某个时间范围内的数据,这

    2024年03月27日
    浏览(54)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(38)
  • 深入理解Java GSS(含kerberos认证及在hadoop、flink案例场景举例)

    在当今的信息安全环境下,保护敏感数据和网络资源的安全至关重要。 Kerberos 认证协议作为一种强大的网络身份验证解决方案,被广泛应用于许多大型分布式系统中,如: Hadoop 。而 Java GSS ( Generic Security Services )作为 Java 提供的通用安全服务,与 Kerberos 认证密切相关。 本

    2024年02月08日
    浏览(39)
  • 【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理

    Apache Flink 是一个流式处理框架,被广泛应用于大数据领域的实时数据处理和分析任务中。在 Flink 中,FileSource 是一个重要的组件,用于从文件系统中读取数据并将其转换为 Flink 的数据流。本文将深入探讨 FileSource 的工作原理、用法以及与其他数据源的比较。 FileSource 是 Fli

    2024年02月21日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包