1、概述
1)作用
flatMap是将数据先map在打平,输入一个元素,可以输出0到多个元素
2)使用
1.匿名内部类
2.lambda表达式
3.实现FlatMapFunction接口
4.继承RichFlatMapFunction
2、代码实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class MyFlatmapDemo {
public static void main(String[] args) throws Exception {
// 创建执行环境的配置,添加webUI的端口号
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 从端口接入数据
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
// 1、匿名内部类
SingleOutputStreamOperator<String> flatMapStream1 = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
});
// 2、lambda表达式
SingleOutputStreamOperator<String> flatMapStream2 = lines.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
}).returns(String.class);
// 3、继承 FlatmapFunction
SingleOutputStreamOperator<String> flatMapStream3 = lines.flatMap(new MyFlatmapFunc());
// 4、继承 RichFlatMapFunction
SingleOutputStreamOperator<String> flatMapStream4 = lines.flatMap(new MyRichFlatMapFunc());
flatMapStream1.print();
flatMapStream2.print();
flatMapStream3.print();
flatMapStream4.print();
env.execute();
}
}
class MyFlatmapFunc implements FlatMapFunction<String,String>{
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
}
class MyRichFlatMapFunc extends RichFlatMapFunction<String,String>{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("可以在rich方法中创建状态和定时器");
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
}
3、执行结果
1)输入测试数据文章来源:https://www.toymoban.com/news/detail-690199.html
nc -lk 8888
hello world
控制台输出执行结果文章来源地址https://www.toymoban.com/news/detail-690199.html
3> hello
3> world
5> hello
5> world
8> hello
8> world
7> hello
7> world
到了这里,关于十二、Flink自定义 FlatMap 方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!