Python 编写 Flink 应用程序经验记录(Flink1.17.1)

这篇具有很好参考价值的文章主要介绍了Python 编写 Flink 应用程序经验记录(Flink1.17.1)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

官方API文档

提交作业到集群运行

官方示例

环境

编写一个 Flink Python Table API 程序

执行一个 Flink Python Table API 程序

实例处理Kafka后入库到Mysql

下载依赖

flink-kafka jar

读取kafka数据

写入mysql数据

flink-mysql jar


官方API文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/

提交作业到集群运行

#! /usr/bin/env python
# -*- coding: utf-8 -*-

# /opt/test_flink.py
if __name__ == "__main__":
    print("这是一个简单的测试用例")

flink 安装目录下的 examples 目录里面已经提供了一些测试案例,我们也可以直接拿它来做实验。

提交至集群

./bin/flink run -py 代码文件

通过 flink run 即可运行应用程序,由于 flink 既可运行 Java 程序、也可以运行 Python 程序,所以这里我们需要指定 -py 参数,表示运行的是 py 文件。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面来测试一下:

./bin/flink run -py /opt/test_flink.py

Python 编写 Flink 应用程序经验记录(Flink1.17.1),大数据,python,flink,开发语言

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然 flink run 是同时支持 Java、Python 等语言的。

不管使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。

提交单个 py 文件知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,只需要把这个启动文件提交上去即可。举例说明,当然这里仍不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。

先来看看编写的程序:

Python 编写 Flink 应用程序经验记录(Flink1.17.1),大数据,python,flink,开发语言

flink_test 就是主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的

Python 编写 Flink 应用程序经验记录(Flink1.17.1),大数据,python,flink,开发语言

即使是多文件,提交方式也是相似的,输出结果表明提交成功了。

官方示例

环境

  • Java 11
  • Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1

编写一个 Flink Python Table API 程序

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

接下来,我们将介绍如何创建源表和结果表。

t_env.create_temporary_table(
    'source',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .build())
        .option('path', input_path)
        .format('csv')
        .build())
tab = t_env.from_path('source')

t_env.create_temporary_table(
    'sink',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .column('count', DataTypes.BIGINT())
                .build())
        .option('path', output_path)
        .format(FormatDescriptor.for_format('canal-json')
                .build())
        .build())

你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:

my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format(input_path)

my_sink_ddl = """
    create table sink (
        word STRING,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'canal-json',
        'path' = '{}'
    )
""".format(output_path)

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何创建及注册表名分别为 source 和 sink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。

接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink

最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):
    for s in line[0].split():
        yield Row(s)

# 计算 word count
tab.flat_map(split).alias('word') \
    .group_by(col('word')) \
    .select(col('word'), lit(1).count) \
    .execute_insert('sink') \
    .wait()

该教程的完整代码如下:

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .build())
                .option('path', input_path)
                .format('csv')
                .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format(FormatDescriptor.for_format('canal-json')
                        .build())
                .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

执行一个 Flink Python Table API 程序

接下来,可以在命令行中运行作业(假设作业名为 word_count.py):

python word_count.py

上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。

最后,你可以得到如下运行结果:

Python 编写 Flink 应用程序经验记录(Flink1.17.1),大数据,python,flink,开发语言

实例处理Kafka后入库到Mysql

下载依赖

flink-kafka jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

Python 编写 Flink 应用程序经验记录(Flink1.17.1),大数据,python,flink,开发语言

读取kafka数据

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import logging

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

from pyflink.common import Row
from pyflink.datastream import FlatMapFunction

def read_kafka():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///D:/安技汇/运营平台/DataManage/flink-sql-connector-kafka-1.17.1.jar")

    source = KafkaSource.builder() \
        .set_bootstrap_servers("172.16.12.128:9092") \
        .set_topics("test") \
        .set_group_id("my-group") \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
        # 从消费组提交的位点开始消费,不指定位点重置策略
        #.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
        # 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
        #.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
        # 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \
        # 从最早位点开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        # 从最末尾位点开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        #.set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒检查一次新分区
        #.set_property("security.protocol", "SASL_PLAINTEXT") \
        #.set_property("sasl.mechanism", "PLAIN") \
        #.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")

    kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    kafka_stream.print()

    env.execute("Source")


if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    read_kafka()

写入mysql数据

flink-mysql jar

没通,待补充。。文章来源地址https://www.toymoban.com/news/detail-720114.html

到了这里,关于Python 编写 Flink 应用程序经验记录(Flink1.17.1)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 为什么选择Go语言编写网络应用程序

    关注公众号【爱发白日梦的后端】分享技术干货、读书笔记、开源项目、实战经验、高效开发工具等,您的关注将是我的更新动力! 作为一名后端开发者,你一定对选择合适的编程语言来编写网络应用程序非常重视。在众多的编程语言中,Go语言(Golang)凭借其独特的特性和

    2024年02月02日
    浏览(73)
  • Flink状态应用测试程序示例

    Flink状态应用测试程序示例 1. 创建执行环境 2. 创建数据流 3. 对数据流进行keyBy()操作 4. 使用RichFlatMapFunction来实现状态的维护 5.执行任务 6.执行结果

    2024年01月25日
    浏览(30)
  • Android应用程序中使用 Gemini Pro AI开发——2年工作经验如何淘汰10年工作经验的Android开发?

    上周,谷歌推出了最强大的基础模型 Gemini 。 Gemini 是多模式的AI——它可以接受文本和图像输入。 谷歌为 Android 开发者引入了一种在设备上,利用最小模型Gemini Nano的方法。此功能可通过 AICore 在部分设备上使用,这是一项处理模型管理、运行时、安全功能等的系统服务,可

    2024年01月18日
    浏览(54)
  • Flink报错:未找到ExecutorFactory来执行应用程序

    Flink报错:未找到ExecutorFactory来执行应用程序 大数据处理是当前互联网时代的核心需求之一。Apache Flink作为一种流式处理引擎,被广泛应用于大规模数据处理和实时分析场景中。然而,在使用Flink时,有时会遇到一些错误和异常。本文将详细探讨一种常见的错误:未找到Exec

    2024年01月17日
    浏览(27)
  • Camera | 4.瑞芯微平台MIPI摄像头应用程序编写

    前面3篇我们讲解了camera的基础概念,MIPI协议,CSI2,常用命令等,本文带领大家入门,如何用c语言编写应用程序来操作摄像头。 Linux下摄像头驱动都是基于v4l2架构,要基于该架构编写摄像头的应用程序,必须先要搞清楚什么是v4l2。 v4l2是video for Linux 2的缩写,是一套Linux内核

    2024年01月18日
    浏览(36)
  • Intellij IDEA编写Spark应用程序的环境配置和操作步骤

    本文介绍如何在win系统中使用IDEA开发spark应用程序,并将其打成jar包上传到虚拟机中的三个Ubuntu系统,然后在分布式环境中运行。 主要步骤包括: 安装Scala插件:在Intellij IDEA中安装Scala插件,并重启IDEA。 创建Maven项目:在Intellij IDEA中创建一个Maven项目,选择Scala语言,并添加

    2024年02月12日
    浏览(35)
  • Building AI-Copilot:构建 LLM 支持的生成应用程序的一些经验教训和模式

    我们正在构建一个用于产品策略和生成创意的实验性人工智能副驾驶,名为“Boba”。一路上,我们学到了一些关于如何构建此类应用程序的有用经验,我们已经根据模式制定了这些应用程序。这些模式允许应用程序帮助用户更有效地与大语言模型 (LLM) 交互,编排提示以获得

    2024年02月14日
    浏览(34)
  • spark-shell(pyspark)单机模式使用和编写独立应用程序

    spark有四种部署方式:Local,Standalone,Spark on Mesos,Spark on yarn。第一个为单机模式,后三个为集群模式。 spark-shell支持python和scala,这里使用python。 1.启动pyspark环境 在spark安装目录下 进入之后,如下图:  2.编写程序 新建代码文件WordCount.py,并编写程序 运行代码:python3 Wor

    2024年04月14日
    浏览(22)
  • 52、Flink的应用程序参数处理-ParameterTool介绍及使用示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(31)
  • 编写一个 Java 应用程序,该程序中有两个类: Vehicle(用于刻画机动车)和 User(主类)。

    具体要求如下: 1)Vehicle 类有一个 double 类型的变量 speed,用于刻画机动车的速度,一个 int 型变量 power,用于刻画机动车的功率。类中定义了 speedUp(int s)方法,体现机动车有减速功能;定义了 speedDown()方法,体现机动车有减速功能;定义了 setPower(int p)方法,用于设置机动车

    2024年02月05日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包