Flink流批一体计算(9):Flink Python

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(9):Flink Python。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

使用Python依赖

使用自定义的Python虚拟环境

方式一:在集群中的某个节点创建Python虚拟环境

方式二:在本地开发机创建Python虚拟环境

使用JAR包

使用数据文件


使用Python依赖

通过以下场景为您介绍如何使用Python依赖:

  • 使用自定义的Python虚拟环境
  • 使用第三方Python包
  • 使用JAR包
  • 使用数据文件

使用自定义的Python虚拟环境

方式一:在集群中的某个节点创建Python虚拟环境
set -e

# 创建Python的虚拟环境。

python3.6 -m venv venv

# 激活Python虚拟环境。

source venv/bin/activate

# 准备Python虚拟环境。

pip install --upgrade pip

# 安装PyFlink依赖。

pip install "apache-flink==1.13.0"

# 退出Python虚拟环境。

deactivate

该命令执行完成后,会生成一个名为venv的目录,即为Python 3.6的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境。

为了使用该Python虚拟环境,您可以选择将该Python虚拟环境分发到集群的所有节点上,也可以在提交PyFlink作业的时候,指定使用该Python虚拟环境。

以下命令显示了不同的PyFlink作业提交用例:

  • 执行 PyFlink job:
$ ./bin/flink run --python examples/python/table/batch/word_count.py
  • 使用pyFiles和--pyModule中指定的主入口模块运行PyFlink作业:
./bin/flink run \
--pyModule batch.word_count \
--pyFiles examples/python/table/batch
  • 在特定主机<jobmanagerHost>运行的JobManager上提交PyFlink作业(相应地调整命令):
$ ./bin/flink run \
 --jobmanager <jobmanagerHost>:8081 \
 --python examples/python/table/batch/word_count.py
  • 在yarn集群上以Per-Job 模式运行PyFlink job:
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/batch/word_count.py
方式二:在本地开发机创建Python虚拟环境
set -e

# 下载Python 3.7 miniconda.sh脚本。

wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"

# 为Python 3.7 miniconda.sh脚本添加执行权限。

chmod +x miniconda.sh

# 创建Python的虚拟环境。

./miniconda.sh -b -p venv

# 激活Conda Python虚拟环境。

source venv/bin/activate ""

# 安装PyFlink依赖。

pip install "apache-flink==1.13.0"

# 退出Conda Python虚拟环境。

conda deactivate

# 删除缓存的包。

rm -rf venv/pkgs

# 将准备好的Conda Python虚拟环境打包。

zip -r venv.zip venv

该命令执行完成后,会生成一个名为venv.zip的文件,即为Python 3.7的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境,或者在虚拟环境中安装所需的第三方Python包。

使用JAR包

如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,则需要指定Connector或者Java自定义函数所在的JAR包。

引用Java UDF或外部连接器的PyFlink作业。--jarfile中指定的JAR文件将上载到集群。

$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--jarfile <jarFile>

使用数据文件

如果您的Flink Python作业中需要访问数据文件,例如模型文件等,则可以通过Python Archives的方式来访问。文章来源地址https://www.toymoban.com/news/detail-521199.html

  • 执行 PyFlink job,增加 source和资源文件 ,--pyFiles中指定的文件将被添加到PYTHONPATH中,因此在Python代码中可用。
$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
                    

到了这里,关于Flink流批一体计算(9):Flink Python的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(4):Flink功能模块

    目录 Flink功能架构 Flink输入输出 Flink功能架构 Flink是分层架构的分布式计算引擎,每层的实现依赖下层提供的服务,同时提供抽象的接口和服务供上层使用。 Flink 架构可以分为4层,包括Deploy部署层、Core核心层、API层和Library层 部署层:主要涉及Flink的部署模式。Flink支持多种

    2024年02月10日
    浏览(49)
  • Flink流批一体计算(5):部署运行模式

    目录 集群运行模式 1.local模式 2.standalone模式 3.Flink on YARN模式 本地模式 Standalone 模式 Flink on Yarn 模式 集群运行模式 类似于 Spark , Flink 也有各种运行模式,其中主要支持三种: local 模式、 standalone 模式以及 Flink on YARN 模式。 每种模式都有特定的使用场景,接下来一起了解一

    2024年02月10日
    浏览(41)
  • flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

    前言 :今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值

    2024年02月19日
    浏览(43)
  • flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

    前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇! Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油! 3. 物理分区 3.1 Global Partitioner 该分区器会将所有的数据都发送到下游的某个算子实例(subta

    2024年02月19日
    浏览(38)
  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(40)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(41)
  • flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

    前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的! Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王

    2024年02月21日
    浏览(42)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(39)
  • flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作

    前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。 Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越

    2024年02月19日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包