SparkLaunch提交Spark任务到Yarn集群

这篇具有很好参考价值的文章主要介绍了SparkLaunch提交Spark任务到Yarn集群。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


1.提交Spark任务的方式

  1. 通过Spark-submit 提交任务
  2. 通过Yarn REST Api提交Spark任务
  3. 通过Spark Client Api 的方式提交任务
  4. 通过SparkLaunch 自带API提交任务
  5. 基于Livy的方式提交任务,可参考我的另一篇文章 Apache Livy 安装部署使用示例

上面的几种方式提交任务各自有对应的优缺点,不再进行赘述,下面要介绍的是通过SparkLaunch 的方式提交到集群中,并异步获取任务的执行状态进行更新到运行记录表中,从而实现Saprk任务的提交和状态获取。


2.SparkLaunch 官方接口

通过官方文档可以了解到SaprkLaunch 对应的方法:SparkLaunch
SparkLaunch 主要有两个接口:

  1. SparkAppHandle 主要负责Spark任务提交到集群上面
  2. SparkAppHandle.Listerner 主要是用来监控Spark的运行状态

SparkLaunch提交Spark任务到Yarn集群

可以查看SparkLaunch 类对应的方法主要用到的方法如下所示:


  //设置配置文件地址
  public SparkLauncher setPropertiesFile(String path);

  //设置App 名称
  public SparkLauncher setAppName(String appName);
  
  //设置 Master
  public SparkLauncher setMaster(String master);

  //设置 部署模式
  public SparkLauncher setDeployMode(String mode) ;

  //设置 Jar包运行主类
  public SparkLauncher setMainClass(String mainClass);

  //设置 Spark 相关参数,需要以spark. 开头
  public SparkLauncher addSparkArg(String name, String value);

  //设置 Main函数的参数
  public SparkLauncher addAppArgs(String... args);

  // 启动Saprk任务的提交
  public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) 
  。。。。。。

上面的一些方法主要用于设置Spark所需要的相关参数,比如资源相关参数、Jar包相关参数、部署模式等,调用startApplication 方法会镇长的取创建一个任务提交的实现,通过下面的两个方法就能够使得任务可以正常提交到Yarn 上面

 sparkLauncher.startApplication(new SparkAppHandle.Listener() {
 	  
 	  // 任务运行状态改变的时候触发的操作
      @Override
      public void stateChanged(SparkAppHandle handle) {

      }
      
 	  // 日志状态出现改变的时候触发的操作
      @Override
      public void infoChanged(SparkAppHandle handle) {

      }
  });

3.任务提交流程及实战

总结上面的方法可以得到任务提交的总体流程:

  1. 创建SparkLaunch
  2. 初始化相关参数信息、资源参数、配置参数、Jar包参数
  3. 调用startApplication 启动任务的提交
  4. 调用stateChanged 捕获状态信息的改变并作出相应的操作

接下来给出完整的任务提交的相关伪代码:文章来源地址https://www.toymoban.com/news/detail-428002.html

    /**
     * 发起任务提交
     *
     * @param sparkApplicationParam Spark 相关配置参数
     * @param otherConfigParams     其他配置参数
     * @param mainParams            main 方法配置参数
     */
    private void launch(DmpRunInfo dmpRunInfo, SparkApplicationParam sparkApplicationParam, Map<String,
            String> otherConfigParams, String[] mainParams) {
        // 初始化SparkLauncher
        SparkLauncher launcher = new SparkLauncher()
                .setSparkHome(sparkApplicationParam.getSparkHome())
                .setAppResource(sparkApplicationParam.getMainJarPath())
                .setMainClass(sparkApplicationParam.getMainClass())
                .setMaster(sparkApplicationParam.getMaster())
                .setDeployMode(sparkApplicationParam.getDeployMode())
                .setAppName(sparkApplicationParam.getAppName())
                .setConf("spark.driver.memeory", sparkApplicationParam.getDriverMemory())
                .setConf("spark.executor.memory", sparkApplicationParam.getExecutorMemory())
                .setConf("spark.executor.cores", sparkApplicationParam.getExecutorCores())
                // spark.yarn.archive 配置的HDFS地址
                .setConf("spark.yarn.archive", SparkParamConstants.SPARK_YARN_ARCHIVE)
                .setConf("spark.yarn.queue", SparkParamConstants.SPARK_PARAM_YARN_QUEUE)
                .setVerbose(true);
        // 禁用输出到本地日志方式
        // .redirectError(new File(otherConfigParams.get("SPARK_ERROR_LOG_DIR")))
        // .redirectOutput(new File(otherConfigParams.get("SPARK_OUT_LOG_DIR")))

        /**
         * 设置其他的参数时候需要使用[spark.] 开头的key ,否则spark 解析不出来
         */
        if (otherConfigParams != null && otherConfigParams.size() > 0) {
            logger.info("开始设置spark job 运行参数");
            for (Map.Entry<String, String> conf : otherConfigParams.entrySet()) {
                logger.info("{}:{}", conf.getKey(), conf.getValue());
                launcher.setConf(conf.getKey(), conf.getValue());
            }
        }

        if (mainParams.length != 0) {
            logger.info("开始设置Spark Job Main 方法的参数 {}", Arrays.toString(mainParams));
            launcher.addAppArgs(mainParams);
        }
        logger.info("参数设置完成,开始提交Spark任务");
        // 线程池的方式运行任务
        executor.execute(() -> {
            try {

                // 线程计数
                CountDownLatch countDownLatch = new CountDownLatch(1);
                SparkAppHandle sparkAppHandle = launcher.startApplication(new SparkAppHandle.Listener() {
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        // 修改运行状态
                        。。。。。。。。。

                        if (handle.getAppId() != null) {
                            // 设置运行ID 到运行记录中
                            logger.info("{} stateChanged :{}", handle.getAppId(), handle.getState().toString());
                        } else {
                            logger.info("stateChanged :{}", handle.getState().toString());
                        }
                        // 更新状态
                        。。。。。。。。
                        // 失败告警发送到群功能
                        if (SparkAppHandle.State.FAILED.toString().equals(handle.getState().toString())) {
                            // 失败告警
                            。。。。。。。。。。。。
                        }

                        // Job 状态完成之后退出线程
                        if (handle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                       // do something

                    }
                });

                logger.info("The task is executing, current is get application id before,please wait ........");
                String applicationId = null;
                while (!SparkAppHandle.State.RUNNING.equals(sparkAppHandle.getState())) {
                    applicationId = sparkAppHandle.getAppId();
                    if (applicationId != null) {
                        logger.warn("handle current state is {}, appid is {}",
                                sparkAppHandle.getState().toString(), applicationId);
                        break;
                    }
                }
                logger.warn("handle current state is {}, appid is {}",
                        sparkAppHandle.getState().toString(), applicationId);
                countDownLatch.await();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        });
    }

到了这里,关于SparkLaunch提交Spark任务到Yarn集群的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包