搭建PyFlink环境(2)

这篇具有很好参考价值的文章主要介绍了搭建PyFlink环境(2)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        😄伙伴们,好久不见!这里是 叶苍ii
        ❀  作为一名大数据博主,我一直致力于分享最新的技术趋势和实战经验。近期,我在参加Flink的顾客营销项目,使用了PyFlink项目进行数据处理和分析。
        ❀  在这个文章合集中,我将与大家分享我的实战经验,探索PyFlink项目的魅力。

1. 安装Apache Flink:

  1. PyFlink是Apache Flink的Python API,因此首先需要安装和配置Apache Flink。
  2. 我们从Apache Flink官方网站(https://flink.apache.org/)下载最新版本的Flink,并按照官方文档提供的指南进行安装和配置。
  3. 这个太慢了,我们使用 国内镜像

1.1.1. Flink版本

目前比较新的版本是 V 1.18.0

我查看了一下文档,总结了Flink 1.7、1.8、1.9和 1.18 新版本 重大区别:

pyflink 安装,Flink/Python专栏,big data,flink,python

Flink 1.7:

  • 引入了基于事件时间的处理模式,使得在流处理中更容易处理乱序事件。
  • 引入了动态表连接,允许在运行时动态地连接和断开表。
  • 引入了Flink SQL的批处理模式,使得可以使用相同的SQL语法进行批处理作业。
  • 引入了对Python Table API的初步支持。

Flink 1.8:

  • 引入了状态后端的概念,允许用户选择将状态存储在不同的后端(如内存、RocksDB等)。
  • 引入了异步快照机制,提高了检查点性能。
  • 引入了对Python DataStream API的支持,使得可以使用Python编写Flink流处理作业。
  • 引入了对Elasticsearch的集成,使得可以直接将数据写入Elasticsearch。

Flink 1.9:

  • 引入了动态表功能,允许在运行时动态地创建、修改和删除表。
  • 引入了对Avro格式的原生支持,使得可以直接读取和写入Avro格式的数据。
  • 引入了对Kubernetes的本地集群部署支持,简化了在Kubernetes上部署Flink作业的过程。
  • 引入了对Python UDF(用户定义函数)的支持,使得可以使用Python编写自定义函数。

Flink 1.18.0:

  • 引入了基于时间特征的处理模式,使得可以更容易地处理事件时间和处理时间之间的转换。
  • 引入了动态表函数,允许在运行时动态地创建、修改和删除表函数。
  • 引入了对Apache Kafka 2.8的原生支持,包括新的Kafka消费者和生产者API。
  • 引入了对Apache Iceberg的集成,使得可以直接读取和写入Iceberg格式的数据。

Flink 1.17.0:

  • 引入了异步I/O线程池,提高了异步操作的性能和可扩展性。
  • 引入了对Python UDF的改进,包括对Python Scalar UDF的支持。
  • 引入了对Apache Hudi的集成,使得可以直接读取和写入Hudi格式的数据。
  • 引入了对Apache Beam的批处理和流处理Runner的支持。

Flink 1.16:

  • 引入了状态后端的改进,包括对RocksDB状态后端的优化和改进。
  • 引入了对Apache ORC的集成,使得可以直接读取和写入ORC格式的数据。
  • 引入了对Apache Calcite的升级,提供更好的SQL优化和查询计划生成。

从1.7版本之后,逐渐的对Python友好,我们这次可以使用的是 1.18.0版本

我们使用 国内镜像下载:Index of /dist/flink

pyflink 安装,Flink/Python专栏,big data,flink,python

pyflink 安装,Flink/Python专栏,big data,flink,python

详细下载地址:

scala : Index of /dist/flink/flink-1.18.0

python: Index of /dist/flink/flink-1.18.0/python

1.1.2. Windows部署Flink

下载完解压 tgz

进入bin 目录,新增两个.bat文件

start-cluster.bat文件 flink.bat文件

::###############################################################################
::  Licensed to the Apache Software Foundation (ASF) under one
::  or more contributor license agreements.  See the NOTICE file
::  distributed with this work for additional information
::  regarding copyright ownership.  The ASF licenses this file
::  to you under the Apache License, Version 2.0 (the
::  "License"); you may not use this file except in compliance
::  with the License.  You may obtain a copy of the License at
::
::      http://www.apache.org/licenses/LICENSE-2.0
::
::  Unless required by applicable law or agreed to in writing, software
::  distributed under the License is distributed on an "AS IS" BASIS,
::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
::  See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
 
@echo off
setlocal EnableDelayedExpansion
 
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\log
 
SET JVM_ARGS=-Xms1024m -Xmx1024m
 
SET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*
 
SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%
 
SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
 
:: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do ( 
SET /A y = %%x+1 
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!"  2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!"  2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0"  2> nul
RENAME "%logname_tm%" "%logname_tm%.0"  2> nul
RENAME "%outname_jm%" "%outname_jm%.0"  2> nul
RENAME "%outname_tm%" "%outname_tm%.0"  2> nul
DEL "%logname_jm%.6"  2> nul
DEL "%logname_tm%.6"  2> nul
DEL "%outname_jm%.6"  2> nul
DEL "%outname_tm%.6"  2> nul
 
for %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (
    echo java.exe was not found in PATH variable
    goto :eof
)
 
echo Starting a local cluster with one JobManager process and one TaskManager process.
 
echo You can terminate the processes via CTRL-C in the spawned shell windows.
 
echo Web interface by default on http://localhost:8081/.
 
start /b java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start /b java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1
 
endlocal
 

::###############################################################################
::  Licensed to the Apache Software Foundation (ASF) under one
::  or more contributor license agreements.  See the NOTICE file
::  distributed with this work for additional information
::  regarding copyright ownership.  The ASF licenses this file
::  to you under the Apache License, Version 2.0 (the
::  "License"); you may not use this file except in compliance
::  with the License.  You may obtain a copy of the License at
::
::      http://www.apache.org/licenses/LICENSE-2.0
::
::  Unless required by applicable law or agreed to in writing, software
::  distributed under the License is distributed on an "AS IS" BASIS,
::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
::  See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
 
@echo off
setlocal
 
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
 
SET JVM_ARGS=-Xmx512m
 
SET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*
 
java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*
 
endlocal

————————————————
版权声明:本文为CSDN博主「向嘴子哥哥」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/FrankHsiang/article/details/131701080

1.1.3. Mac部署Flink

1.1.3.1. 安装brew

使用以下指令一键安装brew

/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"
开始执行Brew自动安装程序
             [cunkai.wang@foxmail.com]
           [2020-08-28 11:26:26][10.15]
       https://zhuanlan.zhihu.com/p/111014448
请选择一个下载镜像,例如中科大,输入1回车。
源有时候不稳定,如果git克隆报错重新运行脚本选择源。cask非必须,有部分人需要。
1、中科大下载源 2、清华大学下载源 3、北京外国语大学下载源 4、腾讯下载源(不显示下载进度) 5、阿里巴巴下载源(缺少cask源)
请输入序号: 1

  你选择了中国科学技术大学下载源

!!!此脚本将要删除之前的brew(包括它下载的软件),请自行备份。
->是否现在开始执行脚本(N/Y)y

--> 脚本开始执行
==> 通过命令删除之前的brew、创建一个新的Homebrew文件夹
(设置开机密码:在左上角苹果图标->系统偏好设置->"用户与群组"->更改密码)
(如果提示This incident will be reported. 在"用户与群组"中查看是否管理员)
请输入开机密码,输入过程不显示,输入完后回车
Password:
开始执行
   ---备份要删除的文件夹到系统桌面....
   ---/usr/local/Homebrew 备份完成
-> 创建文件夹 /usr/local/Homebrew
此步骤成功
   ---备份要删除的文件夹到系统桌面....
   ---/usr/local/Caskroom 备份完成
-> 创建文件夹 /usr/local/Caskroom
此步骤成功
   ---备份要删除的文件夹到系统桌面....
   ---/usr/local/Cellar 备份完成
-> 创建文件夹 /usr/local/Cellar
此步骤成功
   ---备份要删除的文件夹到系统桌面....
   ---/usr/local/var/homebrew 备份完成
-> 创建文件夹 /usr/local/var/homebrew
此步骤成功
-> 创建文件夹 /usr/local/var/homebrew/linked
此步骤成功
git version 2.24.3 (Apple G
1.1.3.2. brew_Flink 部署

方式1:brew 在线安装 Flink Mac本地安装、运行_mac flink 关闭-CSDN博客


#1、查看java版本
java -version
 
#2、安装flink
brew install apache-flink
 
#3、查看flink版本
flink --version
1.1.3.3. 本地安装利用下好的tgz 安装

方式2 ,上述的brew 安装 失败的话 可以本地安装

Flink项目实践 | Flink 单机安装部署

Flink项目实践 | Flink 单机安装部署-腾讯云开发者社区-腾讯云

2. Python环境:

PyFlink是基于Python的API,所以你需要确保在你的机器上已经安装了Python。推荐使用Python 3.x版本,因为PyFlink对Python 3的支持更好。

2.1.1. python下载

我们从Python官方网站 下载并安装适合你操作系统的Python版本。

下载: Python Release Python 3.11.3 | Python.org

pyflink 安装,Flink/Python专栏,big data,flink,python

2.1.2. python配置环境变量

python安装及环境变量配置(mac版)

python安装及环境变量配置(mac版)_mac python 环境变量-CSDN博客

2.1.3. FLink包

  1. PyFlink包:一旦你有了Apache Flink和Python环境,你就可以通过pip或conda等包管理工具安装PyFlink包。运行以下命令即可安装PyFlink:
pip install apache-flink

或者

conda install -c conda-forge pyflink
  1. 配置文件:在安装完成后,你需要根据你的需求进行一些配置。主要的配置文件是flink-conf.yaml,它位于Flink的安装目录下的conf文件夹中。你可以根据需要调整配置项,如并行度、内存分配、检查点设置等。
  2. IDE或编辑器:为了编写和运行PyFlink程序,我们需要选择一个适合的集成开发环境(IDE)或文本编辑器。
  3. 常用的选择包括PyCharm、VS Code、Jupyter Notebook等。确保你在IDE或编辑器中正确配置了Python和PyFlink的环境。

✈    ✈    ✈   ✈   ✈    ✈    ✈   ✈ ✈    ✈    ✈   ✈ ✈    ✈    ✈   ✈ ✈    ✈    ✈   ✈ ✈  

Flink实战(1)-了解Flink

Python专题-pip切换文章来源地址https://www.toymoban.com/news/detail-825669.html

到了这里,关于搭建PyFlink环境(2)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(36)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(38)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(28)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(32)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(35)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(32)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(33)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(34)
  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(30)
  • PyFlink核心知识点

    四层 说明 备注 SteamGraph 代码生成的最初的图 表示程序的拓扑结构 JobGraph 将多个符合条件的节点,链接为一个节点 可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗 ExecutionGraph JobGraph的并行化版本 是调度层最核心的数据结构 PhysicalGraph JobManager根据ExecutionGra

    2024年04月27日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包