最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?

这篇具有很好参考价值的文章主要介绍了最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

自Amazon EMR推出Serverlesss形态以来,得益于开箱即用和零运维的优质特性,越来越多的EMR用户开始尝试EMR Serverless。在使用过程中,一个常被提及的问题是:我们应该如何在EMR Serverless上提交Spark/Hive作业?本文我们将分享一些这方面的最佳实践,帮助大家以一种更优雅的方式使用这项服务。

一份通俗易懂的讲解最好配一个形象生动的例子,本文选择《CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》一文介绍的DeltaStreamer作业作为讲解示例,因为这个作业既有一定的通用性又足够复杂,可以涵盖大多数EMR Serverless作业遇到的场景,更重要的是,该作业的提交方式遵循了本文要介绍的各项最佳实践(本文是其姊妹篇)。不了解Apache Hudi的读者不必担心,本文的关注点在于如何提交EMR Serverless作业本身,而非DeltaStreamer的技术细节,所以不会影响到您阅读此文。

参考范本

首先,我们整理一下提交DeltaStreamer CDC作业的几项关键操作,下文会以这些脚本为例,介绍蕴含其中的各项最佳实践。

1. 导出环境相关变量

export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

2. 创建作业专属工作目录和S3存储桶

mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME

3. 准备作业描述文件

cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name":"apache-hudi-delta-streamer",
    "applicationId":"$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
        "entryPointArguments":[
            "--continuous",
            "--enable-sync",
            "--table-type", "COPY_ON_WRITE",
            "--op", "UPSERT",
            "--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders",
            "--target-table", "orders",
            "--min-sync-interval-seconds", "60",
            "--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
            "--source-ordering-field", "_event_origin_ts_ms",
            "--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
            "--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
            "--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
            "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest",
            "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
            "--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders",
            "--hoodie-conf", "auto.offset.reset=earliest",
            "--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number",
            "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date",
            "--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
            "--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
            "--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory",
            "--hoodie-conf", "hoodie.datasource.hive_sync.table=orders",
            "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"
        ],
         "sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri":"$APP_S3_HOME/logs"
            }
        }
   }
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

4. 提交作业

export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
    --no-paginate --no-cli-pager --output text \
    --name apache-hudi-delta-streamer \
    --application-id $EMR_SERVERLESS_APP_ID \
    --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
    --execution-timeout-minutes 0 \
    --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
    --query jobRunId)

5. 监控作业

now=$(date +%s)sec
while true; do
    jobStatus=$(aws emr-serverless get-job-run \
                    --no-paginate --no-cli-pager --output text \
                    --application-id $EMR_SERVERLESS_APP_ID \
                    --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                    --query jobRun.state)
    if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
        for i in {0..5}; do
            echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
            sleep 1
        done
    else
        echo -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"
        break
    fi
done

6. 检查错误

JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

最佳实践 (1):提取环境相关信息集中配置,提升脚本可移植性

※ 此项最佳实践参考《参考范本:1. 导出环境相关变量》

在EMR Serverless的作业脚本中经常会出现与AWS账号和本地环境有关的信息,例如资源的ARN,各种路径等,当我们要在不同环境(如开发、测试或生产)中提交作业时,就需要查找和替换这些环境相关的信息。为了让脚本具备良好的可移植性,推荐的做法是将这些信息抽离出来,以全局变量的形式集中配置,这样,当在一个新环境(新的AWS账号或服务器)中提交作业时,只需修改这些变量即可,而不是具体的脚本。

最佳实践 (2):为作业创建专用的工作目录和S3存储桶

※ 此项最佳实践参考《参考范本:2. 创建作业专属工作目录和S3存储桶》

为一个作业或应用程序创建专用的工作目录和S3存储桶是一个良好的规范和习惯。一方面,将本作业/应用的所有“资源”,包括:脚本、配置文件、依赖包、日志以及产生的数据统一存放在有利于集中管理和维护,如果要在Linux和S3上给作业赋予读写权限,操作起来了也会简单一些。

最佳实践 (3):使用作业描述文件规避字符转义问题

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

我们通常见到的EMR Serverless作业提交示例是将作业描述以字符串参数形式传递给命令行的,就像下面这样:

aws emr-serverless start-job-run \
    --application-id $EMR_SERVERLESS_APP_ID \
    --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
          "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py",
          "entryPointArguments": ["s3://my-job-bucket/output"]
        }
    }'

这种方式只能应对简单的作业提交,当作业中包含大量参数和变量时,很容易出现单引号、双引号、美元符等特殊字符的转义问题,由于这里牵涉shell字符串和json字符串的双重嵌套和解析,所以会非常麻烦。此时在命令行中给出作业描述是很不明智的,更好的做法是:使用cat命令联合heredoc来创建作业描述文件,然后在命令行中以--cli-input-json file://xxx.json形式将作业描述传递给命令行:

# 生成作业描述文件
cat << EOF > xxx.json
    ... ...
    ... ...
    ... ...   
EOF
# 使用作业描述文件提交作业
aws emr-serverless start-job-run ... --cli-input-json file://xxx.json ...

这是一个非常重要的技巧,使用这种形式提交作业有如下两个好处:

  • 在cat + heredoc中编辑的文本为原生字符串,不用考虑字符转义问题

  • 在cat + heredoc中可嵌入shell变量、函数调用和if…else等结构体,实现“动态”构建作业描述文件

最佳实践 (4):在作业描述文件中嵌入shell变量和脚本片段,实现“动态”构建

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

如上所述,采用cat + heredoc编辑作业描述文件后,可以在编辑文件的过程中嵌入shell变量、函数调用和if...else...等复合结构体,使得我们可以动态构建作业描述文件,这是非常重要的一个能力。在《参考范本:3. 准备作业描述文件》中有一个很好的例证,就是“动态拼接依赖Jar包的路径”:

--conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')

这是在构建作业描述文件start-job-run.json的过程中通过$(....)嵌入的一段shell脚本,这段脚本遍历了指定目录下的jar文件并拼接成一个字符串输出出来,而输出的字符串会在嵌入脚本的地方变成文本的一部分,我们还可以在编辑文本时调用shell函数,嵌入if...else...whilecase等多重复合逻辑结构,让作业描述文件可以根据不同的参数和条件动态生成期望的内容,这种灵活性足以让开发者应对任何复杂的情况。

最佳实践 (5):使用jq校验并格式化作业描述文件

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

jq是一个处理json文件的命令行工具,对于AWS CLI来说,jq可以说是一个“最佳伴侣”。原因是使用AWS CLI创建资源时,除了传入常规参数之外,还可以通过--cli-input-json参数传入一个json文件来描述所要创建的资源。当创建的资源配置过于复杂时,json文件的优势就会凸显出来,就像我们参考范本中的这个EMR Serverless Job一样。所以,使用AWS CLI时经常有编辑和操作json文件的需求,此时jq就成为了一个强有力的辅助工具。在参考范本中,我们仅仅使用jq打印了一下生成的作业描述文件:

jq . $APP_LOCAL_HOME/start-job-run.json

这一步操作有两个作用:一是利用jq校验了json文件,这能帮助排查文件中的json格式错误,二是jq输出的json经过了格式化和语法着色,更加易读。

其实jq在AWS CLI上还有更多高级应用,只是在我们的参考范本中并没有体现出来。在某些情况下,我们可以通过jq直接检索和编辑作业描述文件,将jq和使用cat + heredoc的json编辑方式结合起来,可以创建更加复杂和动态化的作业描述文件。

最佳实践 (6):可复用的依赖Jar包路径拼接脚本

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

拼接依赖Jar包路径几乎是每个作业都要解决的问题,手动拼接虽然可行,但费力且容易出错。过去在本地环境中,我们可以使用:--jars $(echo /path/*.jar | tr ' ' ',')这种简洁而优雅的方式拼接Jar包路径。但是EMR Serverless作业的依赖Jar包是存放在S3上的,这此,我们针对性地编写了一段可复用的脚本来拼接位于S3指定目录下的Jar包路径,供大家参考(请注意替换脚本中出现的两处文件夹路径):

aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//'

最佳实践 (7):可复用的作业监控脚本

※ 此项最佳实践参考《参考范本:5. 监控作业》

使用命令行提交EMR Serverless作业后,用户可以转到AWS控制台上查看作业的状态,但是对开发者来说,这种切换会分散注意力,最完美的方式莫过于提交作业后继续在命令行窗口监控作业状态,直到其失败或成功运行。为此,《参考范本:5. 监控作业》给出了一种实现,可复用于所有EMR Serverless作业,供大家参考。

最佳实践 (8):可复用的日志错误信息检索脚本

※ 此项最佳实践参考《参考范本:6. 检查错误》

在日常开发中,“提交作业报错 -> 查看日志中的报错信息 -> 修改代码重新提交”是一个反复迭代的过程,在EMR Serverless中,用户需要切换到AWS控制台查看错误日志,并且有时日志量会非常大,在控制台上查看效率很低。一种更高效的做法是:将存放于S3上的日志文件统一下载到本地并解压,然后使用grep命令快速检索日志中含有error, failed, exception等关键字的行,然后再打开具体文件仔细查看。将这些动作脚本化后,我们就能得到一段可复用的日志错误信息检索脚本,对于调试和排查错误有很大的帮助。为此,《参考范本:6. 检查错误》给出了一种实现,可复用于所有EMR Serverless作业,供大家参考。文章来源地址https://www.toymoban.com/news/detail-640642.html

到了这里,关于最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【腾讯云 TDSQL-C Serverless 产品体验】基于TDSQL-C Serverless最佳实践助力企业降本增效

    随着公司的业务快速发展,数据库中的数据量猛增,访问性能也变慢了,单台MySQL实例无法应对和满足大规模数据管理和请求访问,导致数据库性能下降,成为瓶颈。 关系型数据本身就比较容易形成系统瓶颈,无论是从单机存储容量、连接数、处理能力都有限。 当单表的数据

    2024年02月08日
    浏览(46)
  • Amazon EMR Hudi 性能调优——Clustering

    随着数据体量的日益增长,人们对 Hudi 的查询性能也提出更多要求,除了 Parquet 存储格式本来的性能优势之外,还希望 Hudi 能够提供更多的性能优化的技术途径,尤其当对 Hudi 表进行高并发的写入,产生了大量的小文件之后,又需要使用 Presto/Trino 对 Hudi 表进行高吞吐的即席

    2024年02月12日
    浏览(46)
  • 使用 Amazon EMR 构建您的数据分析平台

    众所周知,在现如今大数据时代,数据越来越重要。据Gartner最新趋势分析,数据分析将成为创新起源与企业核心能力。同时国际数据公司IDC和数据存储公司希捷的一份报告表示,我国产生的数据量将从2019年的约9.4ZB增至2025年的48.6ZB。 面对如此愈加繁杂和庞大的数据,很多公

    2023年04月08日
    浏览(39)
  • 【程序员英语 代码提交】C++工程师的代码提交艺术:git commit 时 精确表达与最佳实践

    在软件开发的世界里,代码提交(Code Commit)不仅仅是一个简单的行为,它是一种艺术,一种传达你工作的方式。当一个C++工程师提交代码时,他们不只是在保存代码的当前状态,而是在向整个团队传达一个信息:这段代码是什么,为什么要这样做,以及它是如何改变项目的

    2024年02月02日
    浏览(69)
  • Scaling data processing with Amazon EMR at the speed of market volatility

    Good evening everyone. Thanks for joining with us. My name is Meenakshi Shankaran. I\\\'m a senior big data architect with AWS. For the past three years, I have Sat Kumar Sami, Director of Technology FINRA with me and we are here to speak about scaling EMR at the speed of market volatility. And before we get started, I have two questions: How many of you have w

    2024年02月03日
    浏览(38)
  • 使用 Amazon Redshift Serverless 和 Toucan 构建数据故事应用程序

    这是由 Toucan 的解决方案工程师 Django Bouchez 与亚马逊云科技共同撰写的特约文章。 带有控制面板、报告和分析的商业智能(BI,Business Intelligence)仍是最受欢迎的数据和分析使用场景之一。它为业务分析师和经理提供企业的过去状态和当前状态的可视化,帮助领导者制定将影

    2024年02月12日
    浏览(39)
  • re:Invent 产品体验分享:Amazon ElastiCache Serverless 缓存即时扩展功能与感受

    授权说明:本篇文章授权活动官方亚马逊云科技文章转发、改写权,包括不限于在亚马逊云科技开发者社区、 知乎、自媒体平台、第三方开发者媒体等亚马逊云科技官方渠道)。 2023年的亚马逊云科技 re:Invent 全球大会无疑是一场吸引科技人员、IT行业从业者以及众多专业人士

    2024年02月04日
    浏览(35)
  • [ 云计算 | AWS 实践 ] Java 如何重命名 Amazon S3 中的文件和文件夹

    本文收录于【#云计算入门与实践 - AWS】专栏中,收录 AWS 入门与实践相关博文。 本文同步于个人公众号:【 云计算洞察 】 更多关于云计算技术内容敬请关注:CSDN【#云计算入门与实践 - AWS】专栏。 本系列已更新博文: [ 云计算 | AWS 实践 ] Java 应用中使用 Amazon S3 进行存储桶

    2024年02月08日
    浏览(59)
  • 在 Java 中优雅地移除字符串最后一个字符:不同级别程序员的实践指南

    引言: 处理字符串是编程中非常常见的任务。本文将详细介绍四种在 Java 中优雅地移除字符串最后一个字符的方法,并针对不同级别的程序员进行讨论。我们将从简单的方法入手,逐步介绍更高级的技术,以帮助程序员根据自己的水平和需求选择最合适的解决方案。 这是一

    2024年02月13日
    浏览(58)
  • Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择

    说起开源的日志分析系统,ELK几乎无人不晓,这个生态并非是Elastic特意而为,毕竟Elasticsearch的初心是分布式的搜索引擎,被广泛用作日志系统纯粹一个“美丽的意外”,这是社区使用者推动而成。而现在各大云厂商推广自己的日志服务时,也往往将各种指标对标于ELK,可见

    2023年04月12日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包