Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

这篇具有很好参考价值的文章主要介绍了Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

学习文档:Flink 官方文档 - DataStream API - 概览

学习笔记如下:


DataStream

Flink 的 DataStream API:

  • 数据里的起始是各种 source,例如消息队列、socket 流、文件等;
  • 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等;
  • 结果通过 sink 返回,例如可以将数据写入文件或标准输出。

DataStream:Flink 程序中的数据集合;可以将其理解为包含重复项的不可变数据集合。这些数据可以是有界的,也可以是无界的,但用于处理它们的 API 是相同的。

相较于常规的 Java 集合,DataStream 有以下差异:

  • 不可变,一旦创建就不能添加或删除元素
  • 不能简单地查看内部元素,只能使用 DataStream API 来处理它们

DataStream 源码:flink-streaming-java: org.apache.flink.streaming.api.datastream.DataStream

Flink 程序

Flink 程序看起来像一个转换 DataStream 的常规程序,但是 Flink 程序都是延迟执行的。当程序的 main() 方法被执行时,数据加载和转换不会直接发生,只会将每个算子都创建并添加到 dataflow 形成的有向图;只有被执行环境的 execute() 方法显式地被处罚后,这些算子才会真正执行。

每个程序由相同的基本部分组成:

Step 1|获取一个执行环境(execution environment)

通常,调用 StreamExecutionEnvironment 的如下静态方法获取执行环境:

  • getExecutionEnvironment():通常调用这个方法即可
  • createLocalEnvironment()
  • createRemoteEnvironment(String host, int port, String... jarFiles)

样例:文章来源地址https://www.toymoban.com/news/detail-819494.html

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Step 2|加载 / 创建初始数据

执行环境提供了一些方法,用于从任何第三方提供的 source 或本地文件中读取数据。这将生成一个 DataStream,可以在上面应用转换(trasformation)来创建新的派生 DataStream。

样例:以直接逐行读取本地文件中的数据

DataStream<String> text = env.readTextFile("file:///path/to/file");

Step 3|指定数据相关的转换

可以通过调用 DataStream 上具有转换功能的方法来应用转换。

样例:使用 map 进行转换(将每个字符串转换为一个整数并创建一个新的 DataStream)

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

Step 4|指定计算结果的存储位置

可以将包含最终结果的 DataStream,通过创建 sink 写出到外部系统。

样例:将数据结果写出到文件

writeAsText(String path);

Step 5|触发程序执行

需要调用 StreamExecutionEnvironmentexecute()executeAsync() 来触发程序执行。根据 StreamExecutionEnvironment 的类型,执行会在本地机器上触发,或提交到某个集群上执行。

  • execute() 方法:等待作业完成,然后返回一个 JobExecutionResult,其中包括执行时间和累加器结果
  • executeAsync() 方法:触发作业异步执行,它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。

Data Source

Source 是 Flink 程序读取输入的地方。可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 添加 Source,也可以使用 Flink 自带的 source function。

DataSink

Sink 是 Flink 程序写出结果的地方。可以通过 DataStream.addSink(sinkFunction) 添加 Sink,也可以使用 Flink 自带的 sink function。

但是需要注意,DataStream 的 write*() 方法主要用于调试目的,它们不涉及 checkpoint,因此这些函数通常具有至少一次语义。

Iterations

Iterations 是对数据进行迭代处理的机制。在使用 IterativeStream 时,需要指定哪一部分反馈给迭代,哪一部分使用旁路输出或使用过滤器转发到下游。通常来说,我们首先定义一个 IterativeStream 流,例如:

IterativeStream<Integer> iteration = input.iterate();

然后,指定循环内执行的转换逻辑,例如:

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

最后,使用 IterativeStreamcloseWith(feedbackStream) 方法,定义迭代的结束条件。提供的 closeWithDataStream 将反馈给迭代头。一种常见的模式,是使用过滤器将反馈的流部分和向前传播(重新迭代)的流部分分开。例如:

iteration.closeWith(iterationBody.filter(/* one part of the stream */));  // 继续迭代
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);  // 传递给下游的流

样例:下面的程序从一系列整数中连续减去 1,直到它们达到零

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

控制延迟

如果将元素在网络上逐个传输,会导致大量频繁的网络请求。因此,Flink 不会将元素不会在网络上一一传输,而是会进行缓冲,每次通过通过传输直接传输整个完整的缓冲区。缓冲区的大小可以在 Flink 配置文件中传输。

触发缓冲区的网络传输,有两种情况:

  • 缓冲区已满
  • 缓冲区已达到缓冲区的最长等待时间,如果超过缓冲区的最长等待时间,那么即使缓冲区没有满也会被自动发送。超时时间的默认值为 100 毫秒

缓冲区的设置样例如下:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);  // 给流设置缓冲区超时时间
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);  // 给 Operator 设置缓冲区超时时间

缓冲区的超时时间设置得越长,吞吐量越大,极限可以设置 setBufferTimeout(-1) 来关闭超时,这样缓冲区只有在它们已满时才会被刷新;缓冲区设置得越小,延迟越少,但应避免设置超时为 0 的缓冲区,因为它会导致严重的性能下降。

调试

本地执行环境

在本地调试时,可以使用 LocalStreamEnvironment 的本地执行环境,它将在创建它的同一个 JVM 进程中启动 Flink 程序。如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点已实现调试。

样例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// 构建你的程序

env.execute();
从 Java 集合构造 Data Sources

在本地调试时,可以使用 fromElementsfromCollection 方法从 Java 集合中读取数据构造 DataStream。

样例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 从元素列表创建一个 DataStream
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// 从任何 Java 集合创建一个 DataStream
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// 从迭代器创建一个 DataStream
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
将 Data Sink 写出到 Java 迭代器

类似地,也可以将 DataStream 写出到本地的迭代器。

样例:

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();

到了这里,关于Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《Flink学习笔记》——第五章 DataStream API

    一个Flink程序,其实就是对DataStream的各种转换,代码基本可以由以下几部分构成: 获取执行环境 读取数据源 定义对DataStream的转换操作 输出 触发程序执行 获取执行环境和触发程序执行都属于对执行环境的操作,那么其构成可以用下图表示: 其核心部分就是Transform,对数据

    2024年02月10日
    浏览(41)
  • Flink|《Flink 官方文档 - 概念透析 - Flink 架构》学习笔记

    学习文档:概念透析 - Flink 架构 学习笔记如下: 客户端(Client):准备数据流程序并发送给 JobManager(不是 Flink 执行程序的进程) JobManager:协调 Flink 应用程序的分布式执行 ResourceManager:负责 Flink 集群中的资源提供、回收、分配 Dispatcher:提供了用来提交 Flink 应用程序执行

    2024年01月19日
    浏览(46)
  • Flink|《Flink 官方文档 - 内幕 - 文件系统》学习笔记

    学习文档:内幕 - 文件系统 学习笔记如下: Flink 通过 org.apache.flink.core.fs.FileSystem 实现了文件系统的抽象。这种抽象提供了一组通用的操作,以支持使用各类文件系统。 为了支持众多的文件系统, FileSystem 的可用操作集非常有限。例如,不支持对现有文件进行追加或修改。

    2024年02月03日
    浏览(36)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(49)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 Flink 进程的内存》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 内存配置 - 配置 Flink 进程的内存》 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)

    2024年01月21日
    浏览(48)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记

    学习文档:Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。其中,Flink 总内存(Total Flink Memory)包括 JV

    2024年03月15日
    浏览(44)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》 学习笔记如下: Flink 中每条消息都会被放到网络缓冲(network buffer) 中,并以此为最小单位发送到下一个 subtask。 Flink 在传输过程的输入端和输出端使用了网络缓冲队列,即每个 subtask 都有一个输入队列来接收

    2024年01月21日
    浏览(54)
  • Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》 学习笔记如下: 当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过 flink run 提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Pyt

    2024年02月22日
    浏览(63)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 调优指南 & 常见问题》学习笔记

    学习文档: 《Flink 官方文档 - 部署 - 内存配置 - 调优指南》 《Flink 官方文档 - 部署 - 内存配置 - 常见问题》 学习笔记如下: 独立部署模式(Standalone Deployment)下的内存配置 通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销进行限制,它只与机器的

    2024年02月19日
    浏览(42)
  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包