Flink流批一体计算(16):PyFlink DataStream API

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(16):PyFlink DataStream API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

概述

Pipeline Dataflow

代码示例WorldCount.py

执行脚本WorldCount.py


概述

Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。

用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。

Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

Flink流批一体计算(16):PyFlink DataStream API,Flink,flink,大数据

 FlinkKafkaConsumer是一个Source OperatorMapKeyByTimeWindowApplyTransformation OperatorRollingSink是一个Sink Operator

Pipeline Dataflow

Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask

Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。

紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度。

Flink流批一体计算(16):PyFlink DataStream API,Flink,flink,大数据

 紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

Flink流批一体计算(16):PyFlink DataStream API,Flink,flink,大数据

图中上半部分表示的是将Source和map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,keyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。

图中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。

代码示例WorldCount.py

在本章中,你将学习如何使用 PyFlink 和 DataStream API 构建一个简单的流式应用程序。

编写一个简单的 Python DataStream 作业。

该程序读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。文章来源地址https://www.toymoban.com/news/detail-678479.html

import argparse
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)


word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)
    # define the source
    if input_path is not None:
        ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        ds = env.from_collection(word_count_data)


    def split(line):
        yield from line.split()
    # compute word count


    ds = ds.flat_map(split) \
           .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
           .key_by(lambda i: i[0]) \
           .reduce(lambda i, j: (i[0], i[1] + j[1]))
    # define the sink
    if output_path is not None:
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        ds.print()

    # submit for execution
    env.execute()



if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)
    word_count(known_args.input, known_args.output)

执行脚本WorldCount.py

python word_count.py

到了这里,关于Flink流批一体计算(16):PyFlink DataStream API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(43)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(41)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(42)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(40)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(40)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(40)
  • Flink流批一体计算(1):流批一体和Flink概述

    Apache Flink应运而生 数字化经济革命的浪潮正在颠覆性地改变着人类的工作方式和生活方式,数字化经济在全球经济增长中扮演着越来越重要的角色,以互联网、云计算、大数据、物联网、人工智能为代表的数字技术近几年发展迅猛,数字技术与传统产业的深度融合释放出巨大

    2024年02月10日
    浏览(42)
  • flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

    前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇! Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油! 3. 物理分区 3.1 Global Partitioner 该分区器会将所有的数据都发送到下游的某个算子实例(subta

    2024年02月19日
    浏览(39)
  • flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

    前言 :今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值

    2024年02月19日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包