导航
观看本文章之前,请先看之前文章
(十)Flink Datastream API 编程指南 算子-1 (转换算子、物理分区、任务链、资源组 、算子和作业)等基本介绍
常用算子基本使用
效果
本次主要是阅读几个常用算子的源码。
阅读算子列表
- map
- flatmap
- keyby
- filter
- reduce
- window
- windowAll
- 其它
Map()
首先说一下这个算子是one to one的,通俗的讲就是进一条数据 经过逻辑处理后 必出一条数据。
用户代码
package com.stream.samples;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author DeveoperZJQ
* @since 2022/11/12
*/
public class MapOperator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 消费socket数据源
DataStreamSource<String> dataStream = env.socketTextStream("192.168.112.147", 7777);
// map => String::length 是传入的虚函数map的逻辑
SingleOutputStreamOperator<Integer> map = dataStream.map(String::length);
// print输出
map.print();
env.execute(MapOperator.class.getSimpleName());
}
}
上面的逻辑非常简单,目的只是为了追踪map的入口,你可以使用debug模式,并且在map上打上断点,可以一层一层的往下看。
两种变形
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 根据传入函数获取返回值类型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
// 调用下面的map方法
return this.map(mapper, outType);
}
// 带返回的输出类型参数,可以看到传入的operator默认名字就叫Map
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return this.transform("Map", outputType, (OneInputStreamOperator)(new StreamMap((MapFunction)this.clean(mapper))));
}
上面两个重载方法刚好对应上面截图中的两个方法。
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// 抽象工厂
return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
上面代码有一个抽象工厂的简单 算子工厂类,传入的是OneInputStreamOperator类型的算子,然后把用户代码的operator转换成了StreamOperatorFactory operatorFactory, 工厂类是真的多啊,如果设计模式关于工厂方法和抽象工厂没学牢固的同学,可以来这里细品。
// protected 只能包能调用
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
// 我理解这里调用这个就是为了抛出异常的返回类型,如果可用,typeUsed则标识为true
this.transformation.getOutputType();
// 构建OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operatorFactory, outTypeInfo, this.environment.getParallelism());
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
// 这里讲用户逻辑封装到resultTransform,全部装填到List<Transformation<?>> 为构建流图StreamGraph做准备
this.getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
// 添加算子到Transformations<>集合中
@Internal
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
env.execute(MapOperator.class.getSimpleName());这里开始执行,物理执行图部署成功,这个就开始了消费数据。
当然通过上面说的,你可以尝试着自定义map算子或者其它算子,只要使用transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator<T, R> operator)得当,就会实现你的功能,但是一般不建议入门选手这样做。
这里面,咱们还看到很多使用clean() 方法的地方,后面章节会拿出一章说明这个。
flatMap()
可以看出,flatMap的源码和map非常类似。有一个地方值得咱们看一下,就是传入的函数和map是不同 ,flatmap可以一进多出,一进不出,但是map做不到,单纯从算子性能上说flatMap的开销是要比map大的。
来看一下下面两者的区别:new StreamMap((MapFunction) vs new StreamFlatMap((FlatMapFunction)
StreamMap
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
单从这个各自的构造器上看不出什么,传入的函数不一样,可以点击进去
@Public
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
map方法是有返回值的。
再来看下具体处理方法:
public void processElement(StreamRecord<IN> element) throws Exception {
this.output.collect(element.replace(((MapFunction)this.userFunction).map(element.getValue())));
}
从上面可以看出map直接把数据通过collect回收了,等于没有对用户暴露,那么flatMatp呢?看下面的源码。
StreamFlatMap
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
点击进去,是下面的接口类,里面有一个flatMap方法,继承的类也是一样的。
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
flatMap没有返回值,它是利用Collector out返回数据的。
再来看下具体处理方法:
public void open() throws Exception {
super.open();
this.collector = new TimestampedCollector(this.output);
}
public void processElement(StreamRecord<IN> element) throws Exception {
this.collector.setTimestamp(element);
((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);
}
把this.collector作为参数封装到flatMap里了,用户可以通过collector进行操作。
那么现在是不是明白了,为啥map是一对一,flatMap是一对多和一对0了吧。文章来源:https://www.toymoban.com/news/detail-401873.html
结语
本文章是系列文章中的一篇,如果有错误的地方,欢迎批评指正!欢迎同行交流!文章来源地址https://www.toymoban.com/news/detail-401873.html
到了这里,关于Flink从入门到放弃—Stream API—常用算子(map和flatMap)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!