JAVA代码实现Spark任务的提交

这篇具有很好参考价值的文章主要介绍了JAVA代码实现Spark任务的提交。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10

在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。

以下的例子以Spark On Yarn的模式来设计的。文章来源地址https://www.toymoban.com/news/detail-574069.html

  1. 要求运行这个类的机器上拥有Spark客户端
  2. 需要被提交的Spark任务的jar(也可以预先提交到HDFS)上
  3. 要求运行机器拥有hadoop机器的配置文件,yarn的配置文件
  4. 要求程序指导javahome的路径
package com.donny.bigdata.surveillance.components.spark;

import com.donny.bigdata.surveillance.conf.Spark2Config;
import com.donny.bigdata.surveillance.conf.YarnConfig;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.*;

/**
 * @author 1792998761@qq.com
 * @date 2023/3/30 18:00
 */
public class SparkOnYarnHelper {
    private static final Logger LOGGER = LogManager.getLogger(SparkOnYarnHelper.class);

    /**
     * 客户端上的hadoop配置文件根目录
     */
    private static String hadoopConfDir;
    /**
     * 客户端上的yarn配置文件根目录
     */
    private static String yarnConfDir;
    /**
     * 客户端上的javaHome
     */
    private static String javaHome;
    /**
     * 客户端上的Spark配置文件根目录
     */
    private static String sparkHome;
    /**
     * 提交到yarn上的Spark任务的jar的Path
     */
    private static String appReSourcePath = System.getProperty("user.dir");
    /**
     * Spark任务的jar的入口函数
     */
    private static String appMainClass;
    /**
     * 运行spark任务的用户
     */
    public static String hadoopUserName;
    /**
     * 获取yarn上执行结果finalStatus=UNDEFINED时的重试次数
     */
    private static Integer retryMaxCount;

    /**
     * 初始化环境
     */
    public static void init() {
        hadoopConfDir = Spark2Config.getString("spark2.hadoop_conf_dir", "/usr/hdp/2.6.5.0-292/hadoop/conf");
        yarnConfDir = Spark2Config.getString("spark2.yarn_conf_dir", "/usr/hdp/2.6.5.0-292/hadoop/conf");
        javaHome = Spark2Config.getString("spark2.java_home", "/data/java");
        sparkHome = Spark2Config.getString("spark2.spark_home", "/data/hdp/current/spark2-client");
        appReSourcePath = appReSourcePath + Spark2Config.getString("spark2.app_resource_path", "/plugins/spark2-monitor-1.0.jar");
        appMainClass = Spark2Config.getString("spark2.app_main_class", "com.donny.base.monitor.Spark2Monitor");
        hadoopUserName = Spark2Config.getString("hadoop_user_name", "spark");
        retryMaxCount = Spark2Config.getInt("spark2.yarn_report_retry", 3);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("hadoopConfDir=[{}].", hadoopConfDir);
            LOGGER.debug("yarnConfDir=[{}].", hadoopConfDir);
            LOGGER.debug("javaHome=[{}].", javaHome);
            LOGGER.debug("sparkHome=[{}].", sparkHome);
            LOGGER.debug("appReSourcePath=[{}].", appReSourcePath);
            LOGGER.debug("appMainClass=[{}].", appMainClass);
            LOGGER.debug("hadoopUserName=[{}].", hadoopUserName);
            LOGGER.debug("retryMaxCount[{}].", retryMaxCount);
        }
    }

    /**
     * 将任务提交到yarn
     *
     * @param hiveTableName Spark任务中使用的hive表的表名
     * @return 执行结果字符串
     */
    public static String submitJobToYarn(String hiveTableName) {
        String result;
        LOGGER.info(" Spark2 job is starting... ");
        HashMap<String, String> env = new HashMap<>(4);
        env.put("HADOOP_CONF_DIR", hadoopConfDir);
        env.put("JAVA_HOME", javaHome);
        env.put("YARN_CONF_DIR", yarnConfDir);
        env.put("HADOOP_USER_NAME", hadoopUserName);
        // 控制SparkAppHandle监听,直达状态isFinal
        CountDownLatch countDownLatch = new CountDownLatch(1);
        SparkAppHandle handle = null;
        try {
            handle = new SparkLauncher(env)
                    .setSparkHome(sparkHome)
                    .setAppResource(appReSourcePath)
                    .setMainClass(appMainClass)
                    .setMaster("yarn")
                    .setDeployMode("client")
                    .setConf(SparkLauncher.DRIVER_MEMORY, "512M")
                    .setConf(SparkLauncher.EXECUTOR_MEMORY, "512M")
                    .setConf(SparkLauncher.EXECUTOR_CORES, "1")
                    .setConf("spark.default.parallelism", "10")
                    .addAppArgs(hiveTableName)
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        @Override
                        public void stateChanged(SparkAppHandle handle) {
                            if (handle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            LOGGER.info("SparkApp state: {}.", handle.getState().toString());
                        }

                        @Override
                        public void infoChanged(SparkAppHandle handle) {
                            LOGGER.info("SparkApp infoChanged: {}.", handle.getState().toString());
                        }
                    });
        } catch (IOException e) {
            LOGGER.error("SparkLauncher IOException.", e);
        }
        LOGGER.info(" Spark2 job is running... ");
        try {
            // 阻塞到spark任务结束
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOGGER.error("countDownLatch InterruptedException.", e);
        }
        LOGGER.info(" Spark2 job is over. ");

        if (null != handle) {
            result = getJobResult(handle.getAppId());
        } else {
            result = " Spark2 Execution Exception.";
        }
        return result;
    }

    /**
     * 获取Yarn上的任务最终结果
     *
     * @param appId yarn上的应用ID 例如application_1678883677607_0210
     * @return Spark2 job's execution result
     */
    private static String getJobResult(String appId) {
        LOGGER.info(" spark appId is {}.", appId);
        if (null == appId || "".equals(appId)) {
            return " Spark2 Execution Exception, ApplicationId is null.";
        }

        String result = " Spark2 Execution result Obtaining... ";
        String[] as = appId.split("_");
        ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(as[1]), Integer.parseInt(as[2]));
        YarnClient client = YarnClient.createYarnClient();
        Configuration conf = new Configuration();
        for (String key : YarnConfig.PROPS.stringPropertyNames()) {
            conf.set(key, YarnConfig.PROPS.getProperty(key));
        }
        client.init(conf);
        client.start();
        LOGGER.info(" YarnClient is started.");
        ApplicationReport applicationReport = null;

        ScheduledExecutorService yarnReportService = new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().namingPattern("yarn-report-%d").daemon(true).build());
        int retryCount = 0;

        // 默认重试3次,每次延迟1s执行。
        while (retryCount < retryMaxCount) {
            ScheduledFuture<ApplicationReport> scheduledFuture = yarnReportService.schedule(new YarnReport(client, applicationId), 1, TimeUnit.SECONDS);
            boolean f = true;
            while (f) {
                if (scheduledFuture.isDone()) {
                    try {
                        applicationReport = scheduledFuture.get();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("YarnReport[FinalApplicationStatus]={}.", applicationReport.getFinalApplicationStatus());
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        yarnReportService.shutdown();
                        LOGGER.error("YarnReport Exception.", e);
                    } finally {
                        f = false;
                    }
                }
            }

            if (null != applicationReport) {
                String finalStatus = applicationReport.getFinalApplicationStatus().toString();
                if (FinalApplicationStatus.UNDEFINED.toString().equals(finalStatus)) {
                    retryCount++;
                    result = "Spark2 job's finalStatus is UNDEFINED. ";
                    LOGGER.info("Spark2 job's finalStatus=UNDEFINED, retryCount=[{}].", retryCount);
                } else {
                    result = "Spark2 job's finalStatus is " + finalStatus + ".";
                    break;
                }
            } else {
                retryCount++;
                result = "Spark2 job's execution result is null. ";
                LOGGER.info("Spark2 job's finalStatus=null, retryCount=[{}].", retryCount);
            }
        }

        if (!yarnReportService.isShutdown()) {
            yarnReportService.shutdown();
        }

        try {
            client.close();
        } catch (IOException e) {
            LOGGER.error("YarnClient close IOException.", e);
        }
        return result;
    }

    static class YarnReport implements Callable<ApplicationReport> {
        private static final Logger LOGGER = LogManager.getLogger(YarnReport.class);

        final YarnClient client;
        final ApplicationId applicationId;

        YarnReport(YarnClient client, ApplicationId applicationId) {
            this.applicationId = applicationId;
            this.client = client;
        }

        @Override
        public ApplicationReport call() throws Exception {
            ApplicationReport report = client.getApplicationReport(applicationId);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("report={}", report.toString());
            }
            return report;
        }
    }

}

到了这里,关于JAVA代码实现Spark任务的提交的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark 提交任务参数设置关于(线程,shuffle,序列化)

    是在使用 Apache Spark 时,为了设置 Java 虚拟机(JVM)的堆栈大小而使用命令行选项。 -Xss 是 Java 虚拟机的一个选项,用于设置线程的堆栈大小。在这个命令行选项中, -Xss6m 表示将线程的堆栈大小设为 6MB。这个选项的作用是为了避免在运行 Spark 任务时出现堆栈溢出的错误。

    2024年02月02日
    浏览(571)
  • spark streaming如何在kerberos认证的集群上提交任务

    集群有kerberos认证,spark批处理任务提交后正常运行,spark streaming/structed streaming 任务提交后运行不了,报 java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] 的错误,其中streaming任务里面有用到sparkSQL 找到多种处理方式,在一一尝试之

    2024年02月20日
    浏览(35)
  • 使用Java代码远程提交flink任务

    导入依赖 参数格式参考: {     \\\"jarPath\\\":\\\"C:\\\\flink-1.13.5\\\\examples\\\\streaming\\\\WordCount.jar\\\",     \\\"parallelism\\\":1,     \\\"entryPointClassName\\\":\\\"org.apache.flink.streaming.examples.wordcount.WordCount\\\" }

    2024年02月11日
    浏览(44)
  • 4 | Java Spark实现 WordCount

    简单的 Java Spark 实现 WordCount 的教程,它将教您如何使用 Apache Spark 来统计文本文件中每个单词的出现次数。 首先,确保您已经安装了 Apache Spark 并设置了运行环境。您需要准备一个包含文本内容的文本文件,以便对其进行 WordCount 分析。

    2024年02月10日
    浏览(42)
  • 用idea工具scala 和 Java开发 spark案例:WordCount

    目录 一 环境准备 二 scala代码编写 三 java 代码编写         创建一个 maven 工程         添加下列依赖         原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改                  首先准备好数据,即

    2024年02月07日
    浏览(61)
  • spark 经典demo 的 scala 和 java 实现

    💐💐扫码关注公众号,回复 spark 下载geekbang 原价 90 元 零基础入门 Spark 学习资料💐💐 要先对文件中的单词做统计计数,然后再打印出频次最高的 5 个单词,江湖人称“Word Count”wikiOfSpark.txt 文件下载地址:这里 scala 实现 java实现  为了限制机动车保有量,从 2011 年

    2024年04月27日
    浏览(37)
  • Scala第二十章节(Akka并发编程框架、Akka入门案例、Akka定时任务代码实现、两个进程间通信的案例以及简易版spark通信框架案例)

    章节目标 理解Akka并发编程框架简介 掌握Akka入门案例 掌握Akka定时任务代码实现 掌握两个进程间通信的案例 掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库,

    2024年04月11日
    浏览(45)
  • spring boot java项目整合Scala&Spark,接口api调用方式调用scala代码,配置分享

    版本说明: spring boot: 2.5.9 jdk:1.8 spark:2.4.5 sclala:2.11.12 首先你需要有一个完美的spring boot项目(java版本)能成功运行,这就不赘述了,按照网上的自己搭建吧,然后重要的来了,我捣鼓了两天时间,各样的报错见过了,网上的处理方法要嘛是不全,要嘛是没有用,各种办

    2024年02月10日
    浏览(52)
  • Spark—通过Java、Scala API实现WordCount案例的基本操作

    实验原理 Spark的核心就是RDD,所有在RDD上的操作会被运行在Cluster上,Driver程序启动很多Workers,Workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),然后对RDD在内存中进行缓存和计算。 而RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(

    2024年02月15日
    浏览(43)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包