flink on yarn 远程提交

这篇具有很好参考价值的文章主要介绍了flink on yarn 远程提交。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。



import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;

import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;


@Slf4j
public class AppTestV1 {


    @Test
    public void submitJobWithYarnDesc() throws ClusterDeploymentException {
        // hadoop
        String hadoopConfDir = "C:\\Users\\HPN-21-117\\software\\configClusterhadoop\\configurations\\cdh5";
//        String hadoopConfDir = "C:\\Users\\HPN-21-117\\software\\configClusterhadoop\\configurations\\cdh6";
        //flink的本地配置目录,为了得到flink的配置
        String flinkConfDir = "C:\\Users\\HPN-21-117\\software\\flink-1.12.7\\conf";
        //存放flink集群相关的jar包目录
        String flinkLibs = "hdfs://cdh-node0.hypers.com:8020/flink/1.12.7/lib";
        //用户jar
        String userJarPath =  "hdfs://cdh-node0.hypers.com:8020/flink/demo/TopSpeedWindowing.jar";
        String flinkDistJar = "hdfs://cdh-node0.hypers.com:8020/flink/1.12.7/lib/flink-dist_2.12-1.12.7.jar";
//        String flinkDistJar = "hdfs://cdh-node0.hypers.com:8022/flink/lib";
        String[] args = "".split("\\s+");
        String appMainClass = "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing";

        YarnClient yarnClient = YarnUtils.getYarnClient(hadoopConfDir);
        yarnClient.start();

        Configuration flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir);
        //set run model
        flinkConf.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
        //set application name
        flinkConf.setString(YarnConfigOptions.APPLICATION_NAME, "onYarnApiSubmitCase");
        //flink on yarn dependency
        flinkConf.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(new Path(flinkLibs).toString()));
        flinkConf.set(YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar);
        flinkConf.set(PipelineOptions.JARS, Collections.singletonList(new Path(userJarPath).toString()));
        //设置:资源/并发度
        flinkConf.setInteger(CoreOptions.DEFAULT_PARALLELISM, 1);
        flinkConf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1G"));
        flinkConf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1G"));
        flinkConf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);


        ClusterSpecification clusterSpecification = new ClusterSpecification
                .ClusterSpecificationBuilder()
                .setMasterMemoryMB(1024)
                .setTaskManagerMemoryMB(1024)
                .setSlotsPerTaskManager(2)
                .createClusterSpecification();

        YarnClusterInformationRetriever ycir = YarnClientYarnClusterInformationRetriever.create(yarnClient);

        YarnConfiguration yarnConf = (YarnConfiguration) yarnClient.getConfig();

        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, appMainClass);

        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConf,
                yarnConf,
                yarnClient,
                ycir,
                false);

        ClusterClientProvider<ApplicationId> applicationCluster =
                yarnClusterDescriptor.deployApplicationCluster( clusterSpecification, appConfig );

        yarnClient.stop();

    }

    @Test
    public  void submitJobWithCliForte() throws Exception {

        System.setProperty("ENV_FLINK_CONF_DIR", "C:\\Users\\HPN-21-117\\software\\flink-1.14.6\\conf");
        System.setProperty("FLINK_CONF_DIR", "C:\\Users\\HPN-21-117\\software\\flink-1.14.6\\conf");

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromDir("C:\\Users\\HPN-21-117\\software\\flink-1.14.6\\conf");

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        try {
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.getConfiguration()));
            String[] args = "run-application -t yarn-application hdfs://cdh-node0.hypers.com:8022/flink/demo/TopSpeedWindowing.jar".split("\\s+");
            int retCode =
                    SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
            System.exit(retCode);
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            log.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(31);
        }

    }

    public static List<CustomCommandLine> loadCustomCommandLines(
            Configuration configuration, String configurationDirectory) {
        List<CustomCommandLine> customCommandLines = new ArrayList<>();
        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

        //	Command line interface of the YARN session, with a special initialization here
        //	to prefix all options with y/yarn.
        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {
            customCommandLines.add(
                    loadCustomCommandLine(
                            flinkYarnSessionCLI,
                            configuration,
                            configurationDirectory,
                            "y",
                            "yarn"));
        } catch (NoClassDefFoundError | Exception e) {
            final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try {
                log.info("Loading FallbackYarnSessionCli");
                customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
            } catch (Exception exception) {
                log.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
            }
        }

        //	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
        // the
        //	      active CustomCommandLine in order and DefaultCLI isActive always return true.
        customCommandLines.add(new DefaultCLI());

        return customCommandLines;
    }

    /**
     * Loads a class from the classpath that implements the CustomCommandLine interface.
     *
     * @param className The fully-qualified class name to load.
     * @param params The constructor parameters
     */
    private static CustomCommandLine loadCustomCommandLine(String className, Object... params)
            throws Exception {

        Class<? extends CustomCommandLine> customCliClass =
                Class.forName(className).asSubclass(CustomCommandLine.class);

        // construct class types from the parameters
        Class<?>[] types = new Class<?>[params.length];
        for (int i = 0; i < params.length; i++) {
            checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
            types[i] = params[i].getClass();
        }

        Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);

        return constructor.newInstance(params);
    }

    public static String getConfigurationDirectoryFromDir(String env_flink_conf_dir) {

//        String env_flink_conf_dir = System.getenv(flinkConfDir);

        String location = Optional.ofNullable(env_flink_conf_dir)
                .filter(dir -> new File(dir).exists())
                .orElseThrow(() -> new RuntimeException(
                        "The configuration directory '"
                                + env_flink_conf_dir
                                + "', specified in the '"
                                + ConfigConstants.ENV_FLINK_CONF_DIR
                                + "' environment variable, does not exist."));

        return location;
    }

    public static String getConfigurationDirectoryFromEnv() {
        System.setProperty("MY_VAR", "value");

        String env_flink_conf_dir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);

        String location = Optional.ofNullable(env_flink_conf_dir)
                .filter(dir -> new File(dir).exists())
                .orElseThrow(() -> new RuntimeException(
                        "The configuration directory '"
                                + env_flink_conf_dir
                                + "', specified in the '"
                                + ConfigConstants.ENV_FLINK_CONF_DIR
                                + "' environment variable, does not exist."));

        return location;
    }
}

文章来源地址https://www.toymoban.com/news/detail-691881.html

到了这里,关于flink on yarn 远程提交的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据Flink(五十九):Flink on Yarn的三种部署方式介绍以及注意

    文章目录 Flink on Yarn的三种部署方式介绍以及注意 一、Pre-Job 模式部署作业

    2024年02月13日
    浏览(37)
  • Flink、Yarn架构,以Flink on Yarn部署原理详解

    Apache Flink是一个开源的分布式流处理框架,它可以处理实时数据流和批处理数据。Flink的架构原理是其实现的基础,架构原理可以分为以下四个部分:JobManager、TaskManager、JobGraph、Checkpoint。 JobManager JobManager是Flink集群的控制节点,负责接收用户提交的任务,将任务分配给Task

    2024年02月12日
    浏览(40)
  • [Flink] Flink On Yarn(yarn-session.sh)启动错误

    在Flink上启动 yarn-session.sh时出现 The number of requested virtual cores for application master 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster.错误。 版本说明: Hadoop: 3.3.4 Flink:1.17.1 在Flink On Yarn上启动 yarn-session.sh 时出现如下错误: 在yarn-site.xml文件中配置了所有可能相关的参

    2024年02月13日
    浏览(42)
  • Flink on Yarn安装配置

    Apache Flink,作为一个开源的分布式处理引擎,近年来在大数据处理领域崭露头角,其独特的流处理和批处理一体化模型,使得它能够在处理无界和有界数据流时展现出卓越的性能。本文旨在对Flink进行简要的前言性介绍,以及他的安装配置 Apache Flink是一个面向分布式数据流处

    2024年03月24日
    浏览(115)
  • flink on yarn集群部署模式

    介绍 YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配TaskManager 资源。

    2024年01月23日
    浏览(41)
  • Flink on yarn任务日志怎么看

    1、jobmanager日志         在yarn上可以直接看 2、taskmanager日志          在flink的webui中可以看,但是flink任务失败后,webui就不存在了,那怎么看? 这是jobmanager的地址 hadoop02:19888/jobhistory/logs/hadoop02:45454/container_e03_1684463979345_0028_01_000001/container_e03_1684463979345_0028_01_000001/root 你要

    2024年02月16日
    浏览(68)
  • 说说Flink on yarn的启动流程

    核心流程 FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则 上传一些flink的jar和配置文件到HDFS ,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。 接着yarn client会首先向RM 申请一个container来作为ApplicationMas

    2024年02月10日
    浏览(39)
  • Apache Flink连载(二十):Flink On Yarn运行 - Yarn Per-Job模式(弃用)

     🏡 个人主页:IT贫道-CSDN博客  🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. 任务提交命令 2. 任务提交流程

    2024年01月20日
    浏览(41)
  • flink on yarn 中的flink-conf.yaml参数

    在 Flink on YARN 中, flink-conf.yaml 是 Flink 配置文件,用于配置 Flink 应用程序在 YARN 上的运行。通过修改 flink-conf.yaml 文件中的参数,你可以调整 Flink 集群的行为和性能。以下是一些常见的在 flink-conf.yaml 中设置的参数: yarn.application.name : 指定 Flink 应用程序在 YARN 上的名称。

    2024年02月12日
    浏览(41)
  • Apache Flink连载(十八):Flink On Yarn运行原理及环境准备

     🏡 个人主页:IT贫道-CSDN博客  🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink On Yarn运行原理

    2024年02月03日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包