Flink源码之JobManager启动流程

这篇具有很好参考价值的文章主要介绍了Flink源码之JobManager启动流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runCluster

private void runCluster(Configuration configuration, PluginManager pluginManager)
        throws Exception {
    synchronized (lock) {
        initializeServices(configuration, pluginManager); //初始化服务

        // write host information into configuration
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        final DispatcherResourceManagerComponentFactory
                dispatcherResourceManagerComponentFactory =
                        createDispatcherResourceManagerComponentFactory(configuration);
		//创建核心组件
        clusterComponent =
                dispatcherResourceManagerComponentFactory.create(
                        configuration,
                        ioExecutor,
                        commonRpcService,
                        haServices,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
                        executionGraphInfoStore,
                        new RpcMetricQueryServiceRetriever(
                                metricRegistry.getMetricQueryServiceRpcService()),
                        this);
	...ignore code
    }
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)
        throws Exception {

    LOG.info("Initializing cluster services.");

    synchronized (lock) {
        rpcSystem = RpcSystem.load(configuration);

        commonRpcService =
                RpcUtils.createRemoteRpcService(
                        rpcSystem,
                        configuration,
                        configuration.getString(JobManagerOptions.ADDRESS),
                        getRPCPortRange(configuration),
                        configuration.getString(JobManagerOptions.BIND_HOST),
                        configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

        JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

        // update the configuration used to create the high availability services
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        ioExecutor =
                Executors.newFixedThreadPool(
                        ClusterEntrypointUtils.getPoolSize(configuration),
                        new ExecutorThreadFactory("cluster-io"));
        haServices = createHaServices(configuration, ioExecutor, rpcSystem);
        blobServer = new BlobServer(configuration, haServices.createBlobStore());
        blobServer.start();
        heartbeatServices = createHeartbeatServices(configuration);
        metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

        final RpcService metricQueryServiceRpcService =
                MetricUtils.startRemoteMetricsRpcService(
                        configuration, commonRpcService.getAddress(), rpcSystem);
        metricRegistry.startQueryService(metricQueryServiceRpcService, null);

        final String hostname = RpcUtils.getHostname(commonRpcService);

        processMetricGroup =
                MetricUtils.instantiateProcessMetricGroup(
                        metricRegistry,
                        hostname,
                        ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                configuration));

        executionGraphInfoStore =
                createSerializableExecutionGraphStore(
                        configuration, commonRpcService.getScheduledExecutor());
    }
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

Flink源码之JobManager启动流程,BigData,flink,大数据
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。文章来源地址https://www.toymoban.com/news/detail-637379.html

到了这里,关于Flink源码之JobManager启动流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink JobManager的高可用配置

    在flink执行中,jobManager是一个负责执行流式应用执行和检查点生成的组件,一旦发生故障,那么其负责的所有应用都会被取消,所以我们需要对JobManager配置高可用的模式 配置JobManager的高可用需要使用到zookeeper,一方面zookeeper可以进行领导的选举工作,这样备用jobmanager就可以

    2024年02月09日
    浏览(39)
  • flink任务内存调优,TaskManager、JobManager内存配置

            Flink是基于java的JVM运行,拥有高效的数据处理能力,但是考虑到用户在 Flink 上运行的应用的多样性,尽管flink框架已经为所有配置项提供合理的默认值,仍无法满足所有情况下的需求。 为了给用户生产提供最大化的价值, Flink 允许用户在整体上以及细粒度上对集

    2024年02月06日
    浏览(47)
  • Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

    在Apache Hudi初探(一)(与flink的结合)中,我们提到了 Pipelines.hoodieStreamWrite 写hudi文件 ,这个操作真正写hudi是在 Pipelines.hoodieStreamWrite 方法下的 transform(opName(\\\"stream_write\\\", conf), TypeInformation.of(Object.class), operatorFactory) ,具体分析一下写入的过程。 对于 transform(opName(\\\"stream_write\\\", conf), Ty

    2024年02月12日
    浏览(38)
  • Docker中flink-cluster-jobmanager-1运行失败解决方法

    这是flink-cluster-jobmanager-1报错日志 2023-12-07 23:34:26 [ERROR] The execution result is empty. 2023-12-07 23:34:25 Starting Job Manager 2023-12-07 23:34:26 [ERROR] Could not get JVM parameters and dynamic configurations properly. 2023-12-07 23:34:26 [ERROR] Raw output from BashJavaUtils: 2023-12-07 23:34:26 INFO [] - Loading configuration property:

    2024年02月03日
    浏览(37)
  • flink源码分析 - flink命令启动分析

    flink版本: flink-1.12.1 源码位置:  flink-dist/src/main/flink-bin/bin/flink flink命令源码: 首先讲第一段: 工作中,很多人喜欢用符号链接(软连接)去将原始命令链接到一个新的文件。 例如:  将 /home/aaa链接到/opt/soft/flink-1.12.1/bin/flink,  实际使用的时候就可以用 aaa去代替flink命令。 例如

    2024年01月18日
    浏览(40)
  • 说说Flink on yarn的启动流程

    核心流程 FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则 上传一些flink的jar和配置文件到HDFS ,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。 接着yarn client会首先向RM 申请一个container来作为ApplicationMas

    2024年02月10日
    浏览(39)
  • 深入理解 Flink(五)Flink Standalone 集群启动源码剖析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(52)
  • Flink源码之State创建流程

    StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateH

    2024年02月12日
    浏览(90)
  • Flink源码之Checkpoint执行流程

    Checkpoint完整流程如上图所示: JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint SourceTask向下游广播CheckpointBarrier SouceTask完成状态快照后向JobMaster发送快照结果 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果 JobMaster保存SubTask快照结果 JobMaster收到所

    2024年02月11日
    浏览(39)
  • Flink window 源码分析1:窗口整体执行流程

    注:本文源码为flink 1.18.0版本。 其他相关文章: Flink window 源码分析1:窗口整体执行流程 Flink window 源码分析2:Window 的主要组件 Flink window 源码分析3:WindowOperator Flink window 源码分析4:WindowState Window 本质上就是借助状态后端缓存着一定时间段内的数据,然后在达到某些条件

    2024年01月16日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包