学习文档:Flink 官方文档 - DataStream API - 概览
学习笔记如下:
DataStream
Flink 的 DataStream API:
- 数据里的起始是各种 source,例如消息队列、socket 流、文件等;
- 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等;
- 结果通过 sink 返回,例如可以将数据写入文件或标准输出。
DataStream:Flink 程序中的数据集合;可以将其理解为包含重复项的不可变数据集合。这些数据可以是有界的,也可以是无界的,但用于处理它们的 API 是相同的。
相较于常规的 Java 集合,DataStream 有以下差异:
- 不可变,一旦创建就不能添加或删除元素
- 不能简单地查看内部元素,只能使用 DataStream API 来处理它们
DataStream 源码:flink-streaming-java: org.apache.flink.streaming.api.datastream.DataStream
Flink 程序
Flink 程序看起来像一个转换 DataStream
的常规程序,但是 Flink 程序都是延迟执行的。当程序的 main()
方法被执行时,数据加载和转换不会直接发生,只会将每个算子都创建并添加到 dataflow 形成的有向图;只有被执行环境的 execute()
方法显式地被处罚后,这些算子才会真正执行。
每个程序由相同的基本部分组成:
Step 1|获取一个执行环境(execution environment)
通常,调用 StreamExecutionEnvironment
的如下静态方法获取执行环境:
-
getExecutionEnvironment()
:通常调用这个方法即可 createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
样例:文章来源地址https://www.toymoban.com/news/detail-819494.html
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Step 2|加载 / 创建初始数据
执行环境提供了一些方法,用于从任何第三方提供的 source 或本地文件中读取数据。这将生成一个 DataStream,可以在上面应用转换(trasformation)来创建新的派生 DataStream。
样例:以直接逐行读取本地文件中的数据
DataStream<String> text = env.readTextFile("file:///path/to/file");
Step 3|指定数据相关的转换
可以通过调用 DataStream 上具有转换功能的方法来应用转换。
样例:使用 map 进行转换(将每个字符串转换为一个整数并创建一个新的 DataStream)
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
Step 4|指定计算结果的存储位置
可以将包含最终结果的 DataStream,通过创建 sink 写出到外部系统。
样例:将数据结果写出到文件
writeAsText(String path);
Step 5|触发程序执行
需要调用 StreamExecutionEnvironment
的 execute()
或 executeAsync()
来触发程序执行。根据 StreamExecutionEnvironment
的类型,执行会在本地机器上触发,或提交到某个集群上执行。
-
execute()
方法:等待作业完成,然后返回一个JobExecutionResult
,其中包括执行时间和累加器结果 -
executeAsync()
方法:触发作业异步执行,它会返回一个JobClient
,可以通过它与刚刚提交的作业进行通信。
Data Source
Source 是 Flink 程序读取输入的地方。可以通过 StreamExecutionEnvironment.addSource(sourceFunction)
添加 Source,也可以使用 Flink 自带的 source function。
DataSink
Sink 是 Flink 程序写出结果的地方。可以通过 DataStream.addSink(sinkFunction)
添加 Sink,也可以使用 Flink 自带的 sink function。
但是需要注意,DataStream 的 write*()
方法主要用于调试目的,它们不涉及 checkpoint,因此这些函数通常具有至少一次语义。
Iterations
Iterations 是对数据进行迭代处理的机制。在使用 IterativeStream
时,需要指定哪一部分反馈给迭代,哪一部分使用旁路输出或使用过滤器转发到下游。通常来说,我们首先定义一个 IterativeStream
流,例如:
IterativeStream<Integer> iteration = input.iterate();
然后,指定循环内执行的转换逻辑,例如:
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
最后,使用 IterativeStream
的 closeWith(feedbackStream)
方法,定义迭代的结束条件。提供的 closeWith
的 DataStream
将反馈给迭代头。一种常见的模式,是使用过滤器将反馈的流部分和向前传播(重新迭代)的流部分分开。例如:
iteration.closeWith(iterationBody.filter(/* one part of the stream */)); // 继续迭代
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */); // 传递给下游的流
样例:下面的程序从一系列整数中连续减去 1,直到它们达到零
DataStream<Long> someIntegers = env.generateSequence(0, 1000); IterativeStream<Long> iteration = someIntegers.iterate(); DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } }); DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } }); iteration.closeWith(stillGreaterThanZero); DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } });
控制延迟
如果将元素在网络上逐个传输,会导致大量频繁的网络请求。因此,Flink 不会将元素不会在网络上一一传输,而是会进行缓冲,每次通过通过传输直接传输整个完整的缓冲区。缓冲区的大小可以在 Flink 配置文件中传输。
触发缓冲区的网络传输,有两种情况:
- 缓冲区已满
- 缓冲区已达到缓冲区的最长等待时间,如果超过缓冲区的最长等待时间,那么即使缓冲区没有满也会被自动发送。超时时间的默认值为 100 毫秒
缓冲区的设置样例如下:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis); // 给流设置缓冲区超时时间
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); // 给 Operator 设置缓冲区超时时间
缓冲区的超时时间设置得越长,吞吐量越大,极限可以设置 setBufferTimeout(-1)
来关闭超时,这样缓冲区只有在它们已满时才会被刷新;缓冲区设置得越小,延迟越少,但应避免设置超时为 0 的缓冲区,因为它会导致严重的性能下降。
调试
本地执行环境
在本地调试时,可以使用 LocalStreamEnvironment
的本地执行环境,它将在创建它的同一个 JVM 进程中启动 Flink 程序。如果从 IDE 启动 LocalEnvironment
,则可以在代码中设置断点已实现调试。
样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<String> lines = env.addSource(/* some source */); // 构建你的程序 env.execute();
从 Java 集合构造 Data Sources
在本地调试时,可以使用 fromElements
或 fromCollection
方法从 Java 集合中读取数据构造 DataStream。
样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // 从元素列表创建一个 DataStream DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); // 从任何 Java 集合创建一个 DataStream List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data); // 从迭代器创建一个 DataStream Iterator<Long> longIt = ... DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
将 Data Sink 写出到 Java 迭代器
类似地,也可以将 DataStream 写出到本地的迭代器。文章来源:https://www.toymoban.com/news/detail-819494.html
样例:
DataStream<Tuple2<String, Integer>> myResult = ... Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();
到了这里,关于Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!