Flink学习笔记
前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。
Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越来越好的!
二、Flink 流批一体 API 开发
2. Transfromation
2.1 Map
将 DataStream 中的每一个元素转化为另一个元素,类似于之前 wordcount 案例中 word—> (word,1)
案例:使用map操作,读取 apache.log 文件中的字符串数据转换成 ApacheLogEvent 对象
# 日志数据
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/**
* @author lql
* @time 2024-02-13 19:44:52
* @description TODO:使用map操作,读取apache.log文件中的字符串数据转换成ApacheLogEvent对象
*/
public class MapDemo {
public static void main(String[] args) throws Exception {
/**
* 获取ExecutionEnvironment运行环境
* 使用readTextFile读取数据构建数据源
* 创建一个ApacheLogEvent类
* 使用map操作执行转换
* 打印测试
*/
//TODO 获取ExecutionEnvironment运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 使用readTextFile读取数据构建数据源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache2.log");
//TODO 创建一个ApacheLogEvent类
//TODO 使用map操作执行转换
/**
* String:传入值类型
* ApacheEvent:返回值类型
*/
SingleOutputStreamOperator<ApacheEvent> apacheEventBean = lines.map(new MapFunction<String, ApacheEvent>() {
@Override
public ApacheEvent map(String line) throws Exception {
String[] elements = line.split(" ");
String ip = elements[0];
int userId = Integer.parseInt(elements[1]);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long timestamp = simpleDateFormat.parse(elements[2]).getTime();
String method = elements[3];
String path = elements[4];
return new ApacheEvent(ip, userId, timestamp, method, path);
}
});
//TODO 打印测试
apacheEventBean.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class ApacheEvent{
String ip; // 访问ip
int userId; // 用户id
long timestamp; // 访问时间戳
String method; // 访问方法
String path; // 访问路径
}
}
# 打印数据
2> MapDemo.ApacheEvent(ip=10.0.0.1, userId=10003, timestamp=1431829613000, method=POST, path=/presentations/logstash-monitorama-2013/css/print/paper.css)
总结:
- 1- env.readTextFile 返回的类型是:DataStream,而不是 DataStreamSource 类型,不然会报错,这里用 var 快捷键需要注意!
- 2- 重写 map 方法,切割后列表形式,以脚标形式取值
- 3- Intger.parseInt 可以将字符串转化为整数类型
- 4- new 一个 SimpleDateFormat()进行日期格式化处理
- 5- simpleDateFormat.parse(字符串).getTime() 可以获取指定格式的日期
2.2 FlatMap
将 DataStream 中的每一个元素转化为 0……n 个元素,类似于 wordcount 案例中以空格切割单词
实例:读取 flatmap.log 文件中的数据
将数据:
张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板
转化为:
张三有苹果手机
张三有联想电脑
张三有华为平板
李四有…
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lql
* @time 2024-02-14 21:04:09
* @description TODO:读取 flatmap.log文件中的数据,以上数据为一条转换为三条
*/
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
/**
* 开发步骤:
* 构建批处理运行环境
* 构建本地集合数据源
* 使用flatMap将一条数据经过处理转换为三条数据
* 使用逗号分隔字段
* 分别构建三条数据
* 打印输出
*/
// TODO 1: 构建 flink 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2: 获取本地数据源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\flatmap.log");
// TODO 3: 使用flatMap将一条数据经过处理转换为三条数据
SingleOutputStreamOperator<String> result = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] elements = line.split(",");
collector.collect(elements[0] + "有" + elements[1]);
collector.collect(elements[0] + "有" + elements[2]);
collector.collect(elements[0] + "有" + elements[3]);
}
});
result.print();
env.execute();
}
}
结果:
8> 李四有华为手机
8> 李四有苹果电脑
8> 李四有小米平板
5> 张三有苹果手机
5> 张三有联想电脑
5> 张三有华为平板
总结:collect 可以多行书写
2.3 Filter
过滤出来符合条件的元素
实例:读取 apache.log 文件中的访问日志数据,过滤出来以下访问IP是 83.149.9.216 的访问日志。
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:20:30
* @description TODO:读取apache.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志。
*/
public class FilterDemo {
public static void main(String[] args) throws Exception {
/**
* 获取ExecutionEnvironment运行环境
* 使用fromCollection构建数据源
* 使用filter操作执行过滤
* 打印测试
*/
//TODO 获取ExecutionEnvironment运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 使用fromCollection构建数据源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");
//TODO 使用filter操作执行过滤(66.249.73.135)
SingleOutputStreamOperator<String> result = lines.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return line.contains("83.149.9.216");
}
});
result.print();
env.execute();
}
}
结果:
2> 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
2> 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
2> 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
2> 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
2> 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
2> 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
总结:contains 方法可以达到过滤效果
2.4 KeyBy
流处理中没有 groupBy,而是 keyBy
实例:读取本地数据源, 进行单词的计数
package cn.itcast.day02.transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:29:52
* @description TODO:读取本地元组数据源, 进行单词的计数
*/
public class KeyByDemo {
public static void main(String[] args) throws Exception {
// TODO 1: 初始化 Fink 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2: 读取本地数据源
DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("篮球", 1),
Tuple2.of("篮球", 2),
Tuple2.of("篮球", 3),
Tuple2.of("足球", 3),
Tuple2.of("足球", 2),
Tuple2.of("足球", 3)
);
// 在流计算内,来一条算一条,就是每个组的数据,挨个进行计算,求和累加,所以结果中最后一个打印的数据才是最终的求和结果
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(t -> t.f0).sum(1);
// 如果不分组的话, sum的结果是 1+2+3+3+2+3 = 14 分组后是 篮球 6 足球 8
sum.print();
env.execute();
}
}
结果:
4> (足球,3)
4> (足球,5)
4> (足球,8)
5> (篮球,1)
5> (篮球,3)
5> (篮球,6)
总结:
- 1- keyBy 是流式分组
- 2- keyBy () 可以填写 t -> f0, 也可以直接填 0
2.5 Reduce
可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素
实例:读取 apache.log 日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:43:10
* @description TODO: 读取apache.log日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果
*/
public class ReduceDemo {
public static void main(String[] args) throws Exception {
/**
* 获取 ExecutionEnvironment 运行环境
* 使用 readTextFile 构建数据源
* 使用 reduce 执行聚合操作
* 打印测试
*/
//TODO 获取 ExecutionEnvironment 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 使用 readTextFile 构建数据源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");
//TODO 使用 reduce 执行聚合操作
SingleOutputStreamOperator<Tuple2<String, Integer>> ipAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] dataArray = line.split(" ");
return Tuple2.of(dataArray[0], 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = ipAndOne.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple1, Tuple2<String, Integer> tuple2) throws Exception {
return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1);
}
});
result.print();
env.execute();
}
}
结果:
3> (74.218.234.48,3)
3> (74.218.234.48,4)
3> (74.218.234.48,5)
3> (74.218.234.48,6)
总结:
- 1- reduce 类似于 sum 操作
- 2- 重写方法注意返回值写法:return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1)
2.6 minBy 和 maxBy
获取指定字段的最大值、最小值
2.6.1 场景一:
实例:Tuple2 情况
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:57:18
* @description TODO:分组后,求组内最值
*/
public class MinMaxByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] fields = line.split(",");
String word = fields[0];
int count = Integer.parseInt(fields[1]);
return Tuple2.of(word, count);
}
});
KeyedStream<Tuple2<String, Integer>, String> keyd = wordAndCount.keyBy(t -> t.f0);
keyd.minBy(1).print("最小数据>>>");
keyd.maxBy(1).print("最大数据>>>");
env.execute();
}
}
结果:
最大数据>>>:1> (spark,2)
最小数据>>>:1> (spark,2)
最小数据>>>:1> (spark,2)
最大数据>>>:1> (spark,5)
最大数据>>>:8> (hadoop,7)
最大数据>>>:8> (hadoop,7)
最小数据>>>:8> (hadoop,3)
最小数据>>>:8> (hadoop,3)
2.6.2 场景二
实例:Tuple3 情况
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:57:18
* @description TODO:分组后,求组内最值
*/
public class MinMaxByDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用Source创建DataStream
//辽宁,沈阳,1000
//北京,朝阳,8000
//辽宁,朝阳,1000
//辽宁,朝阳,1000
//辽宁,沈阳,2000
//北京,朝阳,1000
//辽宁,大连,3000
//辽宁,铁岭,500
DataStream<String> lines = env.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple3<String, String, Double>> pcm = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String value) throws Exception {
String[] fields = value.split(",");
String province = fields[0];
String city = fields[1];
double money = Double.parseDouble(fields[2]);
return Tuple3.of(province, city, money);
}
});
KeyedStream<Tuple3<String, String, Double>, String> keyed = pcm.keyBy(t -> t.f0);
// considerTimestamps 设置为 false,则 Flink 在比较时不会考虑元素的时间戳,而只会根据指定的字段
SingleOutputStreamOperator<Tuple3<String, String, Double>> res = keyed.minBy(2, false);
res.print();
env.execute();
}
}
结果:
5> (辽宁,沈阳,1000.0)
4> (北京,朝阳,8000.0)
5> (辽宁,朝阳,1000.0)
5> (辽宁,朝阳,1000.0)
5> (辽宁,朝阳,1000.0)
4> (北京,朝阳,1000.0)
5> (辽宁,朝阳,1000.0)
5> (辽宁,铁岭,500.0)
2.7 min max 和 minBy maxBy 的区别
package cn.itcast.day02.transformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.awt.event.TextEvent;
/**
* @author lql
* @time 2024-02-14 22:52:36
* @description TODO
*/
public class MinVSMinByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<Integer, Integer, Integer>> source = env.fromElements(
Tuple3.of(1, 3, 2),
Tuple3.of(1, 1, 2),
Tuple3.of(1, 2, 3),
Tuple3.of(1, 111, 1),
Tuple3.of(1, 1, 1),
Tuple3.of(1, 2, 0),
Tuple3.of(1, 33, 2)
);
source.keyBy(t -> t.f0).min(2).print("min>>>");
source.keyBy(t->t.f0).minBy(2).printToErr("minBy>>>");
env.execute();
}
}
结果:
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,2,0)
minBy>>>:6> (1,2,0)
min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,1)
min>>>:6> (1,3,1)
min>>>:6> (1,3,0)
min>>>:6> (1,3,0)
总结:
- 1- minBy 和 maxBy 会返回整个对象数据(包括最小值所在的前缀)
- 2- min 和 max 只会返回最小值以及第一次最小值的前缀
2.8 Union
将多个DataSet合并成一个DataSet,union合并的DataSet的类型必须是一致的
package cn.itcast.day02.transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 23:06:20
* @description TODO:
* * 使用union实现
* * 将以下数据进行取并集操作
* * 数据集1
* * "hadoop", "hive","flume"
* * 数据集2
* * "hadoop","hive","spark"
* *
* * 注意:
* * 1:合并后的数据不会自动去重
* * 2:要求数据类型必须一致
* */
public class UnionDemo {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)初始化flink的流处理的运行环境
* 2)加载/创建数据源
* 3)处理数据
* 4)打印输出
* 5)递交执行作业
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds1 = env.fromElements("hadoop", "hive", "flume");
DataStreamSource<String> ds2 = env.fromElements("hadoop","hive","spark");
DataStream<String> result = ds1.union(ds2);
result.printToErr();
env.execute();
}
}
结果:
2> hive
6> flume
3> spark
1> hadoop
4> hadoop
5> hive
总结:
- 1- Uinon 合并 dataset, 数据集类型必须一致
- 2- Union 合并不会去除
- 3- Union 合并出来的数据集是乱序的
2.9 Connect
DataStream,DataStream → ConnectedStreams,流相互独立, 作为对比Union后是真的变成一个流了
package cn.itcast.day02.transformation;
/**
* @author lql
* @time 2024-02-14 23:10:14
* @description TODO
*/
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.concurrent.TimeUnit;
/**
* 读取两个数据流(生成两个不同类型的数据流),使用connect进行合并输出
* 和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑
*/
public class ConnectDemo {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)初始化flink流处理的运行环境
* 2)构建两个不同类型数据的数据流
* 3)对连接后的流数据进行业务处理
* 4)打印输出
* 5)启动作业
*/
//TODO 1)初始化flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2)构建两个不同类型数据的数据流
DataStream<Long> longDataStreamSource = env.addSource(new MyNoParallelSource());
DataStream<Long> longDataStreamSource2 = env.addSource(new MyNoParallelSource());
//TODO 3)对连接后的流数据进行业务处理
SingleOutputStreamOperator<String> strDataStreamSource = longDataStreamSource2.map(new MapFunction<Long, String>() {
@Override
public String map(Long aLong) throws Exception {
return "str_" + aLong;
}
});
ConnectedStreams<Long, String> connectedStreams = longDataStreamSource.connect(strDataStreamSource);
//对连接后的流应用不同的业务逻辑
SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return value;
}
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//TODO 4)打印输出
result.print();
//TODO 5)启动作业
env.execute();
}
public static class MyNoParallelSource implements SourceFunction<Long> {
//定义一个变量,是否循环生成数据
private boolean isRunning = true;
private Long count = 0L;
/**
* 这是主要的方法,启动一个数据源
* 实现数据的生成操作
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Long> ctx) throws Exception {
//不断生成订单数据
while (isRunning){
count+=1;
//收集数据返回
ctx.collect(count);
//每隔一秒钟生成一条订单数据
TimeUnit.SECONDS.sleep(1);
}
}
/**
* 取消数据的生成操作
*/
@Override
public void cancel() {
isRunning = false;
}
}
}
结果:
3> 1
5> str_1
4> 2
6> str_2
5> 3
7> str_3
总结:
- Connect 两个流可以类型不一样
2.10 split、select 和 Side Outputs
Split 就是将一个 DataStream 分成两个或者多个 DataStream
Select 就是获取分流后对应的数据
Tips:
- 简单认为就是, Split会给数据打上标记,然后通过Select, 选择标记来划分出不同的Stream,效果类似KeyBy分流,但是比KeyBy更自由些,可以自由打标记并进行分流。
- Side Outputs:split 过期啦,可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author lql
* @time 2024-02-14 23:25:38
* @description TODO
*/
public class StreamSplitDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 当设置为 AUTOMATIC 时,Flink 会自动选择最佳的并行度来执行作业。
//TODO 1.source
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//TODO 2.transformation
//需求:对流中的数据按照奇数和偶数拆分并选择
OutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
OutputTag<Integer> evenTag = new OutputTag("偶数", TypeInformation.of(Integer.class));
SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
//out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
if (value % 2 == 0) {
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
DataStream<Integer> oddResult = result.getSideOutput(oddTag);
DataStream<Integer> evenResult = result.getSideOutput(evenTag);
//TODO 3.sink
System.out.println(oddTag);//OutputTag(Integer, 奇数)
System.out.println(evenTag);//OutputTag(Integer, 偶数)
oddResult.print("奇数:");
evenResult.print("偶数:");
//TODO 4.execute
env.execute();
}
}
结果:
OutputTag(Integer, 奇数)
OutputTag(Integer, 偶数)
奇数::3> 1
偶数::8> 6
偶数::6> 4
偶数::4> 2
奇数::5> 3
奇数::1> 7
奇数::7> 5
偶数::2> 8
偶数::4> 10
奇数::3> 9
总结:
- 1- OutputTag 对象用于定义输出类型
- 2- process 可以分流
- 3- 引流数据使用:getSideOutput 方法
2.11 Iterate
在流中创建“反馈(feedback)”循环,通过将一个算子的输出重定向到某个先前的算子。
迭代的数据流向:DataStream → IterativeStream → DataStream
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 23:34:23
* @description TODO:Iterate迭代流式计算
*/
public class IterateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//10
DataStreamSource<String> strs = env.socketTextStream("node1", 9999);
DataStream<Long> numbers = strs.map(Long::parseLong);
//调用iterate方法 DataStream -> IterativeStream
//对Nums进行迭代(不停的输入int的数字)
IterativeStream<Long> iteration = numbers.iterate();
//IterativeStream -> DataStream
//对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑
DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("iterate input =>" + value);
return value -= 2;
}
});
//只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
//传入迭代的条件
iteration.closeWith(feedback);
//不满足迭代条件的最后要输出
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
//数据结果
output.printToErr("output value:");
env.execute();
}
}
结果:
iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2
总结:
- 1- 更新模型,更新参数较为常见
- 2- 算子迭代,需要理解应用
ction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@author lql
-
@time 2024-02-14 23:34:23
-
@description TODO:Iterate迭代流式计算
*/
public class IterateDemo {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //10 DataStreamSource<String> strs = env.socketTextStream("node1", 9999); DataStream<Long> numbers = strs.map(Long::parseLong); //调用iterate方法 DataStream -> IterativeStream //对Nums进行迭代(不停的输入int的数字) IterativeStream<Long> iteration = numbers.iterate(); //IterativeStream -> DataStream //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑 DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("iterate input =>" + value); return value -= 2; } }); //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑 DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value > 0; } }); //传入迭代的条件 iteration.closeWith(feedback); //不满足迭代条件的最后要输出 DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value <= 0; } }); //数据结果 output.printToErr("output value:"); env.execute();
}文章来源:https://www.toymoban.com/news/detail-827393.html
}
结果:
```java
iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2
总结:文章来源地址https://www.toymoban.com/news/detail-827393.html
- 1- 更新模型,更新参数较为常见
- 2- 算子迭代,需要理解应用
到了这里,关于flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!