Flink学习笔记
前言:今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值的输入进行自定义分区。
Tips:尼采曾经说过:“每一个不起眼的日子,都是对生命的辜负!” 虽然转码学习之路比起科班同学会更加艰辛,不过我相信只要愿意坚持,多理解多敲代码,多向各位大佬请教,即使一点一滴也是会有收获的,明天也要继续加油!
二、Flink 流批一体 API 开发
3. 物理分区
3.5 Rescale Partition
RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例1:如图顺序
上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
举例2:如图倒序
若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
实例:上游并行度是2,下游是4,接收数据
package cn.itcast.day05.partition;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-15 23:47:11
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。
*/
// 组内轮询
public class RescalePartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一个非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//并行度2
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(2);
//广播,上游的算子将一个数据广播到下游所以的subtask
DataStream<String> shuffled = mapped.rescale();
shuffled.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + index);
}
}).setParallelism(4);
env.execute();
}
}
结果:
hadoop : 0 -> 0
hadoop : 1 -> 2
hadoop : 0 -> 1
hadoop : 1 -> 3
hadoop : 0 -> 0
hadoop : 1 -> 2
hadoop : 0 -> 1
hadoop : 1 -> 3
总结:
- rescale partition 类似于部分数据对应部分 sink,且数据不相冲突。
3.6 Forward Partition
发送到下游对应的第一个task,保证上下游算子并行度一致,即上游算子与下游算子是1:1的关系
实例:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。
package cn.itcast.day05.partition;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-16 22:38:17
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区
*/
public class ForwardPartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一个非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//并行度2
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(2);
//广播,上游的算子将一个数据广播到下游所以的subtask
DataStream<String> shuffled = mapped.forward();
shuffled.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + index);
}
}).setParallelism(2);
env.execute();
}
}
结果:
flink : 0 -> 0
flink : 1 -> 1
flink : 0 -> 0
flink : 1 -> 1
总结:
- 1- Forward Partition 原理和 Rescale Partition 原理相似,遵循 POINTWISE 模式连接下游算子,而其他 Partition 都采用ALL_TO_ALL模式来连接
- 2- 在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用RebalancePartitioner
- 3- 对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常
拓展:POINTWISE 模式和 ALL_TO_ALL模式
3.7 Custom Partitioning
使用用户定义的 Partitioner 为每个元素选择目标任务
实例:编写Flink程序,接收socket的单词数据,并将每个字符串写入到指定的分区中。
package cn.itcast.day05.partition;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-16 22:51:31
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串写入到指定的分区中。
*/
public class CustomPartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一个非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapped = lines.map(new RichMapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(value, indexOfThisSubtask);
}
});
//按照指定的规则进行分区
DataStream<Tuple2<String, Integer>> partitioned = mapped.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
int res = 0;
if ("spark".equals(key)) {
res = 1;
} else if ("flink".equals(key)) {
res = 2;
} else if ("hadoop".equals(key)) {
res = 3;
}
return res;
}
}, tp -> tp.f0);
partitioned.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value.f0 + " , 上游 " + value.f1 + " -> 下游 " + index);
}
});
env.execute();
}
}
结果:文章来源:https://www.toymoban.com/news/detail-826425.html
hadoop , 上游 1 -> 下游 3
spark , 上游 2 -> 下游 1
flink , 上游 3 -> 下游 2
总结:文章来源地址https://www.toymoban.com/news/detail-826425.html
- partitionCustom 可以指定规则进行分区
- tp -> tp.f0 这里指的是 Tuple2 中接受的数据以第一个作为判断标准
到了这里,关于flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!