解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?

这篇具有很好参考价值的文章主要介绍了解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?,大数据专题,emr,serverless,sql,文件,作业 博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

长久已来,SQL以其简单易用、开发效率高等优势一直是ETL的首选编程语言,在构建数据仓库和数据湖的过程中发挥着不可替代的作用。Hive和Spark SQL也正是立足于这一点,才在今天的大数据生态中牢牢占据着主力位置。在常规的Spark环境中,开发者可以使用spark-sql命令直接执行SQL文件,这是一项看似平平无奇实则非常重要的功能:一方面,这一方式极大地降低了Spark的使用门槛,用户只要会写SQL就可以使用Spark;另一方面,通过命令行驱动SQL文件的执行可以极大简化SQL作业的提交工作,使得作业提交本身被“代码化”,为大规模工程开发和自动化部署提供了便利。

但遗憾的是,Amazon EMR Serverless 未能针对执行SQL文件提供原生支持,用户只能在Scala/Python代码中嵌入SQL语句,这对于倚重纯SQL开发数仓或数据湖的用户来说并不友好。为此,我们专门开发了一组用于读取、解析和执行SQL文件的工具类,借助这组工具类,用户可以在 Amazon EMR Serverless 上直接执行SQL文件,本文将详细介绍一下这一方案。

1. 方案设计


鉴于在Spark编程环境中执行SQL语句的方法是:`spark.sql("...")`,我们可以设计一个通用的作业类,该类在启动时会根据传入的参数读取指定位置上的SQL文件,然后拆分成单条SQL并调用`spark.sql("...")`执行。为了让作业类更加灵活和通用,还可以引入通配符一次加载并执行多个SQL文件。此外,ETL作业经常需要根据作业调度工具生成的时间参数去执行相应的批次,这些参数同样会作用到SQL中,所以,作业类还应允许用户在SQL文件中嵌入自定义变量,并在提交作业时以参数形式为自定义变量赋值。基于这种设计思路,我们开发了一个项目,实现了上述功能,项目地址为:
项目名称 项目地址
Amazon EMR Serverless Utilities https://github.com/bluishglc/emr-serverless-utils

项目中的com.github.emr.serverless.SparkSqlJob类即为通用的SQL作业类,该类接受两个可选参数,分别是:

参数 说明 取值示例
–sql-files 指定要执行的SQL文件路径,支持Java文件系统通配符,可指定多个文件一起执行 s3://my-spark-sql-job/sqls/insert-into-*.sql
–sql-params K1=V1,K2=V2,...形式为SQL文件中定义的${K1},${K2},…形式的变量设值 CUST_CITY=NEW YORK,ORD_DATE=2008-07-15

该方案具备如下特性:

① 允许单一SQL文件包含多条SQL语句
② 允许在SQL文件中使用${K1},${K2},…的形式定义变量,并在执行作业时使用K1=V1,K2=V2,...形式的参数进行变量赋值
③ 支持Java文件系统通配符,可一次执行多个SQL文件

下面,我们将分别在AWS控制台和命令行两种环境下介绍并演示如何使用该项目的工具类提交纯SQL作业。

2. 实操演示

2.1. 环境准备


在EMR Serverless上提交作业时需要准备一个“EMR Serverless Application”和一个“EMR Serverless Job Execution Role”,其中后者应具有S3和Glue Data Catalog的读写权限。Application可以在EMR Serverless控制台(EMR Studio)上通过向导轻松创建(全默认配置即可),Execution Role可以使用 [《CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》](https://blog.csdn.net/bluishglc/article/details/132011197) 一文第5节提供的脚本快速创建。

接下来要准备提交作业所需的Jar包和SQL文件。首先在S3上创建一个存储桶,本文使用的桶取名:my-spark-sql-job(当您在自己的环境中操作时请注意替换桶名),然后从 [ 此处 ] 下载编译好的 emr-serverless-utils.jar包并上传至s3://my-spark-sql-job/jars/目录下:

解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?,大数据专题,emr,serverless,sql,文件,作业

在演示过程中还将使用到5个SQL示例文件,从 [ 此处 ] 下载解压后上传至s3://my-spark-sql-job/sqls/目录下:

解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?,大数据专题,emr,serverless,sql,文件,作业

2.2. 在控制台上提交纯SQL文件作业

2.2.1. 执行单一SQL文件

打开EMR Serverless的控制台(EMR Studio),在选定的EMR Serverless Application下提交一个如下的Job:

解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?,大数据专题,emr,serverless,sql,文件,作业

① Script location:设定为此前上传的Jar包路径 s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar
② Main class:设定为 com.github.emr.serverless.SparkSqlJob
③ Script arguments:设定为 ["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]

至于其他选项,无需特别设定,保持默认配置即可,对于在生产环境中部署的作业,您可以结合自身作业的需要灵活配置,例如Spark Driver/Executor的资源分配等。需要提醒的是:通过控制台创建的作业默认会启用Glue Data Catalog(即:Additional settings -> Metastore configuration -> Use AWS Glue Data Catalog 默认是勾选的),为了方便在Glue和Athena中检查SQL脚本的执行结果,建议您不要修改此项默认配置。

上述配置描述了这样一项工作:以s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar中的com.github.emr.serverless.SparkSqlJob作为主类,提起一个Spark作业。其中["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]是传递给SparkSqlJob的参数,用于告知作业所要执行的SQL文件位置。本次作业执行的SQL文件只有三条简单的DROP TABLE语句,是一个基础示例,用以展示工具类执行单一文件内多条SQL语句的能力。

2.2.2. 执行带自定义参数的SQL文件

接下来要演示的是工具类的第二项功能:执行带自定义参数的SQL文件。新建或直接复制上一个作业(在控制台上选定上一个作业,依次点击 Actions -> Clone job),然后将“Script arguments”的值设定为:
["--sql-files","s3://my-spark-sql-job/sqls/create-tables.sql","--sql-params","APP_S3_HOME=s3://my-spark-sql-job"]

如下图所示:

解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?,大数据专题,emr,serverless,sql,文件,作业

这次的作业设定除了使用--sql-files参数指定了SQL文件外,还通过--sql-params参数为SQL中出现的用户自定义变量进行了赋值。根据此前的介绍,APP_S3_HOME=s3://my-spark-sql-job是一个“Key=Value”字符串,其含义是将值s3://my-spark-sql-job赋予了变量APP_S3_HOME,SQL中所有出现${APP_S3_HOME}的地方都将被s3://my-spark-sql-job所替代。查看create-tables.sql文件,在建表语句的LOCATION部分可以发现自定义变量${APP_S3_HOME}

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (
    ... ...
)
... ...
LOCATION '${APP_S3_HOME}/data/orders/';

SparkSqlJob读取该SQL文件时,会根据键值对字符串APP_S3_HOME=s3://my-spark-sql-job将SQL文件中所有的${APP_S3_HOME}替换为s3://my-spark-sql-job,实际执行的SQL将变为:

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (
    ... ...
)
... ...
LOCATION 's3://my-spark-sql-job/data/orders/';

提交作业并执行完毕后,可登录Athena控制台,查看数据表是否创建成功。

2.2.3. 使用通配符执行多个文件

有时候,我们需要批量执行一个文件夹下的所有SQL文件,或者使用通配符选择性的执行部分SQL文件,`SparkSqlJob`使用了[Java文件系统通配符](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-)来支持这类需求。下面的作业就演示了通配符的使用方法,同样是新建或直接复制上一个作业,然后将“Script arguments”的值设定为:
["--sql-files","s3://my-spark-sql-job/sqls/insert-into-*.sql"]

如下图所示:

解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?,大数据专题,emr,serverless,sql,文件,作业

这次作业的--sql-files参数使用了路径通配符,insert-into-*.sql将同时匹配insert-into-orders.sqlinsert-into-customers.sql两个SQL文件,它们将分别向ORDERSCUSTOMERS两张表插入多条记录。执行完毕后,可以可登录Athena控制台,查看数据表中是否有数据产生。

2.2.4. 一个复合示例

最后,我们来提交一个更有代表性的复合示例:文件通配符 + 用户自定义参数。再次新建或直接复制上一个作业,然后将“Script arguments”的值设定为:
["--sql-files","s3://my-spark-sql-job/sqls/select-*.sql","--sql-params","APP_S3_HOME=s3://my-spark-sql-job,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"]

如下图所示:

![emr-serverless-snapshot-4.jpg-150.8kB][6]

本次作业的--sql-files参数使用路径通配符select-*.sql匹配select-tables.sql文件,该文件中存在三个用户自定义变量,分别是${APP_S3_HOME}${CUST_CITY}${ORD_DATE}

CREATE EXTERNAL TABLE ORDERS_CUSTOMERS
    ... ...
    LOCATION '${APP_S3_HOME}/data/orders_customers/'
AS SELECT
    ... ...
WHERE
    C.CUST_CITY = '${CUST_CITY}' AND
    O.ORD_DATE = CAST('${ORD_DATE}' AS DATE);

--sql-params参数为这三个自定义变量设置了取值,分别是:APP_S3_HOME=s3://my-spark-sql-jobCUST_CITY=NEW YORKORD_DATE=2008-07-15,于是上述SQL将被转化为如下内容去执行:

CREATE EXTERNAL TABLE ORDERS_CUSTOMERS
    ... ...
    LOCATION 's3://my-spark-sql-job/data/orders_customers/'
AS SELECT
    ... ...
WHERE
    C.CUST_CITY = 'NEW YORK' AND
    O.ORD_DATE = CAST('2008-07-15' AS DATE);

至此,通过控制台提交纯SQL文件作业的所有功能演示完毕。

2.3. 通过命令行提交纯SQL文件作业


实际上,很多EMR Serverless用户并不在控制台上提交自己的作业,而是通过AWS CLI提交,这种方式方式多见于工程代码或作业调度中。所以,我们再来介绍一下如何通过命令行提交纯SQL文件作业。

本文使用命令行提交EMR Serverless作业的方式遵循了《最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?》一文给出的最佳实践。首先,登录一个安装了AWS CLI并配置有用户凭证的Linux环境(建议使用Amazon Linux2),先使用命令sudo yum -y install jq安装操作json文件的命令行工具:jq(后续脚本会使用到它),然后完成如下前期准备工作:

① 创建或选择一个作业专属工作目录和S3存储桶
② 创建或选择一个EMR Serverless Execution Role
③ 创建或选择一个EMR Serverless Application

接下来将所有环境相关变量悉数导出(请根据您的AWS账号和本地环境替换命令行中的相应值):

export APP_NAME='change-to-your-app-name'
export APP_S3_HOME='change-to-your-app-s3-home'
export APP_LOCAL_HOME='change-to-your-app-local-home'
export EMR_SERVERLESS_APP_ID='change-to-your-application-id'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-execution-role-arn'

以下是一份示例:

export APP_NAME='my-spark-sql-job'
export APP_S3_HOME='s3://my-spark-sql-job'
export APP_LOCAL_HOME='/home/ec2-user/my-spark-sql-job'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

《最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?》一文提供了多个操作Job的通用脚本,都非常实用,本文也会直接复用这些脚本,但是由于我们需要多次提交且每次的参数又有所不同,为了便于使用和简化行文,我们将原文中的部分脚本封装为一个Shell函数,取名为submit-spark-sql-job

submit-spark-sql-job() {
    sqlFiles="$1"
    sqlParams="$2"
    cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name":"my-spark-sql-job",
    "applicationId":"$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"$APP_S3_HOME/jars/emr-serverless-utils-1.0.jar",
        "entryPointArguments":[
            $([[ -n "$sqlFiles" ]] && echo "\"--sql-files\", \"$sqlFiles\"")
            $([[ -n "$sqlParams" ]] && echo ",\"--sql-params\", \"$sqlParams\"")
        ],
         "sparkSubmitParameters":"--class com.github.emr.serverless.SparkSqlJob --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri":"$APP_S3_HOME/logs"
            }
        }
   }
}
EOF
    jq . $APP_LOCAL_HOME/start-job-run.json
    export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
        --no-paginate --no-cli-pager --output text \
        --name my-spark-sql-job \
        --application-id $EMR_SERVERLESS_APP_ID \
        --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
        --execution-timeout-minutes 0 \
        --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
        --query jobRunId)
    now=$(date +%s)sec
    while true; do
        jobStatus=$(aws emr-serverless get-job-run \
                        --no-paginate --no-cli-pager --output text \
                        --application-id $EMR_SERVERLESS_APP_ID \
                        --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                        --query jobRun.state)
        if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
            for i in {0..5}; do
                echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
                sleep 1
            done
        else
            printf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"
            break
        fi
    done
}

该函数接受两个位置参数:

① 第一位置上的参数用于指定SQL文件路径,其值会传递给SparkSqlJob--sql-files
② 第二位置上的参数用于指定SQL文件中的用户自定义变量,其值会传递给SparkSqlJob--sql-params

函数中使用的Jar包和SQL文件与《2.1. 环境准备》一节准备的Jar包和SQL文件一致,所以使用脚本提交作业前同样需要完成2.1节的环境准备工作。接下来,我们就使用该函数完成与2.2节一样的操作。

2.3.1. 执行单一SQL文件

本节操作与2.2.1节完全一致,只是改用了命令行方式实现,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/drop-tables.sql"
2.3.2. 执行带自定义参数的SQL文件

本节操作与2.2.2节完全一致,只是改用了命令行方式实现,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/create-tables.sql" "APP_S3_HOME=$APP_S3_HOME"
2.3.3. 使用通配符执行多个文件

本节操作与2.2.3节完全一致,只是改用了命令行方式实现,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/insert-into-*.sql"
2.3.4. 一个复合示例

本节操作与2.2.4节完全一致,只是改用了命令行方式实现,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/select-tables.sql" "APP_S3_HOME=$APP_S3_HOME,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"

3. 在源代码中调用工具类


尽管在Spark编程环境中可以使用`spark.sql(...)`形式直接执行SQL语句,但是,从前文示例中可以看出 [emr-serverless-utils](https://github.com/bluishglc/emr-serverless-utils) 提供的SQL文件执行能力更便捷也更强大一些,所以,最后我们简单介绍一下如何在源代码中调用相关的工具类获得上述SQL文件的处理能力。具体做法非常简单,你只需要:

① 将emr-serverless-utils-1.0.jar加载到你的类路径中
② 声明隐式类型转换
③ 在spark上直接调用execSqlFile()文章来源地址https://www.toymoban.com/news/detail-653488.html


# 初始化SparkSession及其他操作
...

# 声明隐式类型转换
import com.github.emr.serverless.SparkSqlSupport._

# 在spark上直接调用execSqlFile()
spark.execSqlFile("s3://YOUR/XXX.sql")

# 在spark上直接调用execSqlFile()
spark.execSqlFile("s3://YOUR/XXX.sql", "K1=V1,K2=V2,...")

# 其他操作
...

到了这里,关于解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Amazon SageMaker:搭建企业级AI模型的完整解决方案

    人工智能仍处于科技浪潮之巅… 随着智能芯片、大数据和云计算的发展,深度学习技术得到进一步升级。以 ChatGPT 为首的AIGC技术大放异彩:AI绘画、AI作曲、AI编程、AI写作…一系列AI产品赋能生产;边缘计算、联邦学习、多智能体等技术逐渐从学术界走向工业界,提高生产效

    2023年04月15日
    浏览(50)
  • 如何在Amazon EMR上使用RAPIDS加速Apache Spark流水线

    : [Amazon Web Services re:Invent 2023, Rapids Spark, Accelerate Apache Spark Pipelines, Amazon Emr, Rapids, Nvidia, Gpu Acceleration] 本文字数: 1000, 阅读完需: 5 分钟 如视频不能正常播放,请前往bilibili观看本视频。 https://www.bilibili.com/video/BV1uw41187VA RAPIDS加速器可以为Amazon EMR上的Apache Spark数据处理流

    2024年02月04日
    浏览(48)
  • 浪潮信息龙蜥联合实验室领衔成立 Serverless SIG 打造标准化开源解决方案

    近日, 浪潮信息龙蜥联合实验室 在龙蜥社区领衔成立 Serverless SIG(服务器无感知计算 SIG),并举行了首届 Serverless SIG MeetUp,活动由浪潮信息龙蜥联合实验室主办,来自浪潮信息、天津大学、阿里云、Intel、中国联通等多位资深专家, 分享了不同业务场景下的 Serverless 案例实

    2023年04月26日
    浏览(52)
  • Linux Crontab定时执行脚本不执行,但手动执行脚本正常原因及解决方案

    使用rsync作为备份工具,实现电子文件的增量备份,写了一个脚本,在linux命令行调用 sh shell.sh进行测试,成功实现了电子文件的备份功能,然后在Crontab增加了定时任务,每天凌晨1点进行同步。 后来发现文件同步失败了,后来排查发现应该是脚本虽然被执行,但是没有成功,

    2024年02月08日
    浏览(54)
  • Linux Crontab定时执行脚本出错,但手动执行脚本正常原因及解决方案

    实际开发场景 需要开发一个Flink监控程序,初步使用shell脚本进行监控,如果发现失败了,则自动重新运行Flink命令行参数进行重启。 遇到的问题 编写好shell脚本后,在linux命令行调用 sh shell.sh进行测试,成功实现监控和重启功能。于是利用crontab对脚本进行定时调度监控。 后

    2024年02月15日
    浏览(51)
  • hbase命令输错时无法执行情形及解决方案

    1、命令中缺少另一半单引号,如下图红框所示,此时在下一行命令补上另一半单引号后按enter键; 2、命令中缺少另一半双引号,如下图红框所示,此时在下一行命令补上另一半双引号后按enter键; 3、命令中缺少大括号或其他语法错误,导致命令行中“:0”变成“1、1*、2、

    2024年04月12日
    浏览(31)
  • 找不到vcruntime140_1.dll无法执行的问题解决方案

    随着技术的不断进步,人们越来越依赖电脑来处理日常工作。时常在安装或运行一些软件的时候,我们可能会碰到一些提示信息,其中的“找不到vcruntime140_1.dll无法执行”就是很常见的一种。今天我们就来探讨一下这个问题的原因和解决方案。 一.vcruntime140_1.dll问题的原因 通

    2024年02月05日
    浏览(44)
  • 国内企业出海首选的免费开源生产执行管理系统(MES)解决方案

    Odoo制造执行系统 (MES) 系统的创新型实时解决方案,可帮助您了解最新生产数据 准确获取各地生产设施的数据对于短期业务执行以及长期战略规划都极为重要。为此,Odoo 提供了基于条码扫描仪的传统界面,以及支持互动的创新型平板电脑应用。条码扫描仪可加快数据录入速

    2024年02月04日
    浏览(55)
  • Python+Selenium程序执行完,chrome浏览器自动关闭解决方案

    因为把driver = webdriver.Chrome()放在了函数内部,在函数执行完毕之后,程序内所有的步骤都结束了,关于这段程序的进程也就结束了,浏览器包含在内,所以才会自动退出。 设置全局变量,即把打开浏览器的操作放在函数外部,函数执行完毕,浏览器就不会关闭 关闭浏览器代

    2024年02月16日
    浏览(77)
  • Python中使用execjs执行JavaScript代码:方法与常见错误解决方案

     简介和背景:          execjs 库的作用和重要性是在Python中执行JavaScript代码。它允许开发者在Python环境下调用JavaScript逻辑和功能,从而实现Python与JavaScript之间的交互。通过 execjs ,Python开发者可以利用JavaScript的强大功能和现有库,拓展Python应用的能力,实现跨语言的灵

    2024年02月10日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包