学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》
学习笔记如下:
当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过 flink run
提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Python。
示例:执行一个 PyFlink 作业
$ ./bin/flink run --python examples/python/table/word_count.py
通过 --pyFiles
参数,可以添加 Python 的文件依赖,这个文件可以是 Python 文件、Python 包或其他文件。
示例:执行一个 PyFlink 作业并添加依赖文件
$ ./bin/flink run \ --python examples/python/table/word_count.py \ --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
通过 --jarFile
参数,可以添加 Python 的 Jar 包依赖。在 --jarFile
参数中添加的 Jar 包将会上传到集群。
实例:执行一个 PyFlink 作业并添加 Jar 包依赖
$ ./bin/flink run \ --python examples/python/table/word_count.py \ --jarfile <jarFile>
通过 --pyModule
参数,可以以 module 模式执行 PyFlink 任务。
示例:使用 module 模式执行 PyFlink 任务
$ ./bin/flink run \ --pyModule word_count \ --pyFiles examples/python/table
通过 --jobmanager
参数,可以将 PyFlink 作业提交到指定的 JobManager。
示例:将 PyFlink 任务提交到运行在
<jobmanagerHost>
的指定 JobManager$ ./bin/flink run \ --jobmanager <jobmanagerHost>:8081 \ --python examples/python/table/word_count.py
通过 --target
参数,可以使用 YARN Cluster in Per-Job Mode 执行 PyFlink 作业。
示例:将 PyFlink 作业提交 YARN Cluster in Per-Job Mode
$ ./bin/flink run \ --target yarn-per-job --python examples/python/table/word_count.py
可以使用 run-application -t yarn-application
命令将 PyFlink 作业提交到 YARN cluster in Application Mode。其中,通过 -pyarch
指定的存档文件将通过 blob 服务器分发到 TaskManagers,其中文件大小限制为 2GB。如果存档文件的大小超过 2GB,则可以先将其上传到分布式文件系统,然后使用命令行选项 -pyarch
指定路径。
示例:将 PyFlink 作业提交到 YARN cluster in Application Mode
$ ./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name=<ApplicationName> \ -Dyarn.ship-files=/path/to/shipfiles \ -pyarch shipfiles/venv.zip \ -pyclientexec venv.zip/venv/bin/python3 \ -pyexec venv.zip/venv/bin/python3 \ -pyfs shipfiles \ -pym word_count
在这个任务中,假定 Python 依赖位于
/path/to/shipfiles
中;例如,其中应该包含venv.zip
和word_count.py
。当它在 YARN Application Mode 下的 JobManager 执行作业时,
-pyarch
和pyfs
中指定的路径是相对于 shipfiles 的路径,shipfiles
是已发送文件的目录名称。
可以使用 run-application -target kubernetes-application
命令将 PyFlink 作业提交到 native Kubernetes cluster,这需要一个已经安装了 PyFlink 的 Docker 镜像。
示例:将 PyFlink 作业提交到 native Kubernetes cluster文章来源:https://www.toymoban.com/news/detail-836748.html
$ ./bin/flink run-application \ --target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id=<ClusterId> \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image.ref=<PyFlinkImageName> \ --pyModule word_count \ --pyFiles /opt/flink/examples/python/table/word_count.py
在 run
命令行 run-application
命令中支持的参数清单:文章来源地址https://www.toymoban.com/news/detail-836748.html
-
-py
/--python
:Python 程序入口脚本,其依赖可以通过--pyFiles
参数添加。 -
-pym
/--pyModule
:Module 模式的 Python 程序入口,这个参数必须与--pyFiles
同时使用。 -
-pyfs
/--pyFiles
:为作业附加自定义文件,这些文件将被添加到本地客户端以及远端 Python UDF 的PYTHONPATH
中;其中后缀为.zip
的文件会被提取出来并添加到PYTHONPATH
中;多个文件之间可以试用,
分隔。例如--pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip
。 -
-pyarch
/--pyArchives
:为作业添加 Python 存档文件;这些存档文件会被提取到 Python UDF Worker 的工作目录中。例如--pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python
。 -
-pyclientexec
/--pyClientExecutable
:在提交 Python 任务时,以及编译 Python UDF 时使用的 Python 解释器路径。例如--pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python
。 -
-pyexec
/--pyExecutable
:执行 Python UDF 的 Python 解释器路径。例如--pyExecutable /usr/local/bin/python3
。 -
-pyreq
/--pyRequirements
:指定作业所需第三方模块的requirements.txt
文件;这些文件将被安装并添加到 Python UDF 的PYTHONPATH
中。可以选择指定一个目录,用于存储这些依赖的安装包,如果指定了目录,则使用#
作为分隔符。例如--pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir
.
到了这里,关于Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!