Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记

这篇具有很好参考价值的文章主要介绍了Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》

学习笔记如下:


当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过 flink run 提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Python。

示例:执行一个 PyFlink 作业

$ ./bin/flink run --python examples/python/table/word_count.py

通过 --pyFiles 参数,可以添加 Python 的文件依赖,这个文件可以是 Python 文件、Python 包或其他文件。

示例:执行一个 PyFlink 作业并添加依赖文件

$ ./bin/flink run \
   --python examples/python/table/word_count.py \
   --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

通过 --jarFile 参数,可以添加 Python 的 Jar 包依赖。在 --jarFile 参数中添加的 Jar 包将会上传到集群。

实例:执行一个 PyFlink 作业并添加 Jar 包依赖

$ ./bin/flink run \
   --python examples/python/table/word_count.py \
   --jarfile <jarFile>

通过 --pyModule 参数,可以以 module 模式执行 PyFlink 任务。

示例:使用 module 模式执行 PyFlink 任务

$ ./bin/flink run \
   --pyModule word_count \
   --pyFiles examples/python/table

通过 --jobmanager 参数,可以将 PyFlink 作业提交到指定的 JobManager。

示例:将 PyFlink 任务提交到运行在 <jobmanagerHost> 的指定 JobManager

$ ./bin/flink run \
   --jobmanager <jobmanagerHost>:8081 \
   --python examples/python/table/word_count.py

通过 --target 参数,可以使用 YARN Cluster in Per-Job Mode 执行 PyFlink 作业。

示例:将 PyFlink 作业提交 YARN Cluster in Per-Job Mode

$ ./bin/flink run \
   --target yarn-per-job
   --python examples/python/table/word_count.py

可以使用 run-application -t yarn-application 命令将 PyFlink 作业提交到 YARN cluster in Application Mode。其中,通过 -pyarch 指定的存档文件将通过 blob 服务器分发到 TaskManagers,其中文件大小限制为 2GB。如果存档文件的大小超过 2GB,则可以先将其上传到分布式文件系统,然后使用命令行选项 -pyarch 指定路径。

示例:将 PyFlink 作业提交到 YARN cluster in Application Mode

$ ./bin/flink run-application -t yarn-application \
   -Djobmanager.memory.process.size=1024m \
   -Dtaskmanager.memory.process.size=1024m \
   -Dyarn.application.name=<ApplicationName> \
   -Dyarn.ship-files=/path/to/shipfiles \
   -pyarch shipfiles/venv.zip \
   -pyclientexec venv.zip/venv/bin/python3 \
   -pyexec venv.zip/venv/bin/python3 \
   -pyfs shipfiles \
   -pym word_count

在这个任务中,假定 Python 依赖位于 /path/to/shipfiles 中;例如,其中应该包含 venv.zipword_count.py

当它在 YARN Application Mode 下的 JobManager 执行作业时,-pyarchpyfs 中指定的路径是相对于 shipfiles 的路径,shipfiles 是已发送文件的目录名称。

可以使用 run-application -target kubernetes-application 命令将 PyFlink 作业提交到 native Kubernetes cluster,这需要一个已经安装了 PyFlink 的 Docker 镜像。

示例:将 PyFlink 作业提交到 native Kubernetes cluster

$ ./bin/flink run-application \
   --target kubernetes-application \
   --parallelism 8 \
   -Dkubernetes.cluster-id=<ClusterId> \
   -Dtaskmanager.memory.process.size=4096m \
   -Dkubernetes.taskmanager.cpu=2 \
   -Dtaskmanager.numberOfTaskSlots=4 \
   -Dkubernetes.container.image.ref=<PyFlinkImageName> \
   --pyModule word_count \
   --pyFiles /opt/flink/examples/python/table/word_count.py

run 命令行 run-application 命令中支持的参数清单:文章来源地址https://www.toymoban.com/news/detail-836748.html

  • -py / --python:Python 程序入口脚本,其依赖可以通过 --pyFiles 参数添加。
  • -pym / --pyModule:Module 模式的 Python 程序入口,这个参数必须与 --pyFiles 同时使用。
  • -pyfs / --pyFiles:为作业附加自定义文件,这些文件将被添加到本地客户端以及远端 Python UDF 的 PYTHONPATH 中;其中后缀为 .zip 的文件会被提取出来并添加到 PYTHONPATH 中;多个文件之间可以试用 , 分隔。例如 --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip
  • -pyarch / --pyArchives:为作业添加 Python 存档文件;这些存档文件会被提取到 Python UDF Worker 的工作目录中。例如 --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python
  • -pyclientexec / --pyClientExecutable:在提交 Python 任务时,以及编译 Python UDF 时使用的 Python 解释器路径。例如 --pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python
  • -pyexec / --pyExecutable:执行 Python UDF 的 Python 解释器路径。例如 --pyExecutable /usr/local/bin/python3
  • -pyreq / --pyRequirements:指定作业所需第三方模块的 requirements.txt 文件;这些文件将被安装并添加到 Python UDF 的 PYTHONPATH 中。可以选择指定一个目录,用于存储这些依赖的安装包,如果指定了目录,则使用 # 作为分隔符。例如 --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir.

到了这里,关于Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 调优指南 & 常见问题》学习笔记

    学习文档: 《Flink 官方文档 - 部署 - 内存配置 - 调优指南》 《Flink 官方文档 - 部署 - 内存配置 - 常见问题》 学习笔记如下: 独立部署模式(Standalone Deployment)下的内存配置 通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销进行限制,它只与机器的

    2024年02月19日
    浏览(33)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(38)
  • flink 单作业模式部署提交作业爆:Trying to access closed classloader. Please check if you store classloaders direc

    指令信息 报错信息:Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configu

    2024年02月16日
    浏览(33)
  • flink作业提交流程

    目录 作业提交流程 独立模式 YARN模式 会话模式 单作业模式 应用模式 (1) 一般情况下,由客户端(App)通过分发器提供的 REST 接口,将作业提交给JobManager。 (2)由分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster。 (3)JobMaster 将 JobGraph 解析为可执行的 Exec

    2024年02月12日
    浏览(34)
  • Flink-----Standalone会话模式作业提交流程

    1.Flink的Slot特点: 均分隔离内存,不隔离CPU 可以共享: 同一个job中,不同算子的子任务才可以共享同一个slot,同时在运行的前提是,属于同一个slot共享组,默认都是“default” 2.Slot的数量 与 并行度 的关系 slot 是一种静态的概念,表示最大的并发上线 并行度是个动态的概念

    2024年02月12日
    浏览(31)
  • 【flink】使用flink-web-ui提交作业报错

    使用WebUI提交作业出现错误。 错误截图:  弹框信息: 在弹框中是无法看到具体错误信息的。 需要去 job-manager/logs中看详细信息: Failed to create checkpoint storage at checkpoint coordinator side 无法在检查点协调器端创建检查点存储  怎么还没有办法创建呢???? 看一下我的StateBa

    2024年02月14日
    浏览(41)
  • Flink|《Flink 官方文档 - 内幕 - 文件系统》学习笔记

    学习文档:内幕 - 文件系统 学习笔记如下: Flink 通过 org.apache.flink.core.fs.FileSystem 实现了文件系统的抽象。这种抽象提供了一组通用的操作,以支持使用各类文件系统。 为了支持众多的文件系统, FileSystem 的可用操作集非常有限。例如,不支持对现有文件进行追加或修改。

    2024年02月03日
    浏览(27)
  • Flink|《Flink 官方文档 - 概念透析 - Flink 架构》学习笔记

    学习文档:概念透析 - Flink 架构 学习笔记如下: 客户端(Client):准备数据流程序并发送给 JobManager(不是 Flink 执行程序的进程) JobManager:协调 Flink 应用程序的分布式执行 ResourceManager:负责 Flink 集群中的资源提供、回收、分配 Dispatcher:提供了用来提交 Flink 应用程序执行

    2024年01月19日
    浏览(39)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(37)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包