1.说明
1)代码使用的flink版本为1.16.1,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换;
2)本次编写的两个方案,均只适用于数据源topic来自同一个集群,且kafka消费组相同,暂未研究flink的connect算子join多条流
2.依赖引用
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.16.1</flink.version>
<hutool.version>5.8.15</hutool.version>
</properties>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
3. 方案一:适用于sink topic存在跨集群等kafka生产者配置信息不相同的情况
代码涉及Hadoop相关环境,若无该环境的同学,可以设置为本地路径
3.1配置文件
# 输入topic列表
newInputTopic=hive_data_input_topic
# 输出topic列表
newOutputTopic=topic-test
3.2 java代码
public static void main(String[] args) throws Exception {
// 设置操作HDFS的用户
System.setProperty("HADOOP_USER_NAME", "hadoop");
// 获取命令行参数,args[0] 为配置文件路径 input/customer.properties
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
String inputTopic = parameterTool.get("newInputTopic");
String outputTopic = parameterTool.get("newOutputTopic");
// 构建输入topic
ArrayList<String> inputTopicList = new ArrayList<>();
inputTopicList.add("canal_mysql_input_topic");
if (!StringUtils.isNullOrWhitespaceOnly(inputTopic)) {
inputTopicList.add(inputTopic);
}
// 构建输出topic
Map<String, String> hashMap = new HashMap<>();
hashMap.put("ap_article", "canal_input_topic");
hashMap.put("ap_user", "cast_topic_input");
if (!StringUtils.isNullOrWhitespaceOnly(outputTopic)) {
hashMap.put("hive_table_orders", "topic-test");
}
// 构建配置
Configuration configuration = new Configuration();
// 设定本地flink dashboard的webUi访问端口,即http://localhost:9091
configuration.setString("rest.port", "9091");
// 设定从指定的checkpoint恢复,此处为HDFS路径,可更换为本地路径"file:///D:\\test\\flink-tuning\\checkpoint\\jobId\\chk-xx"
String savePointPath = "hdfs://masterNode:8020/flink-tuning/checkpoint/b66ee8431170f07764db0e777c58848a/chk-36";
// 设置savepoint路径,以及是否允许本次提交的程序有新增有状态算子,必须给原来的算子配置uid作为唯一标识,否则会出现问题
SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savePointPath, true);
SavepointRestoreSettings.toConfiguration(restoreSettings, configuration);
// 获取执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
// 开启检查点,设置检查点间隔时间
environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 设置状态后端类型
environment.setStateBackend(new HashMapStateBackend());
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
// 设置checkpoint文件存放路径,设置本地路径:file:///D:\\test\\flink-tuning\\checkpoint
checkpointConfig.setCheckpointStorage("hdfs://masterNode:8020/flink-tuning/checkpoint");
// 设置并发数,同时最多可以有几个checkpoint执行
checkpointConfig.setMaxConcurrentCheckpoints(1);
// checkpoint失败次数,超过此次数,job挂掉(checkpoint不会重试,会等待下一个checkpoint)
checkpointConfig.setTolerableCheckpointFailureNumber(5);
// 超时多久没完成checkpoint,任务失败
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
// 手动cancel掉job时,保留在外部系统的checkpoint不会被删除
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 从kafka读取数据
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.200.130:9092")
.setTopics(inputTopicList)
.setGroupId("group-test-savepoint")
// 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
SingleOutputStreamOperator<String> streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
.uid("kafka_source") // 最好设置一下算子的id
.setParallelism(5); // 设置并行度 = topic分区数
// 此处使用循环,会开辟map键值对个数的算子链,多个filter --> sink算子链,详情见下图
// map中可配置topic所属集群,以及鉴权信息等,此处省略
for (String key : hashMap.keySet()) {
// filter算子根据数据中的表名table与topic之间的映射关系,过滤数据
SingleOutputStreamOperator<String> outputStreamOperator = streamSource.filter(vo -> {
JSONObject jsonObject = JSONUtil.parseObj(vo);
String tableName = (String) jsonObject.get("table");
return tableName.equals(key);
}).uid("filter-" + key).setParallelism(5);
// 构建kafka sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// kafka集群,可根据不同topic所在集群不同,动态更换ip
.setBootstrapServers("192.168.200.130:9092")
// 自定义kafka序列化器
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
// 根据映射获取输出topic
.setTopic(hashMap.get(key))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
// 一致性语义:至少一次
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// sink算子
outputStreamOperator.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
}
// 执行
environment.execute();
3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)
文章来源:https://www.toymoban.com/news/detail-419653.html
4.方案二:适用于输入及输出topic都用属于一个集群的场景
4.1 配置文件同上
4.2 Java代码
public static void main(String[] args) throws Exception {
// 环境配置同上,故此处省略。。。
// 从kafka读取数据
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.200.130:9092")
.setTopics(inputTopicList)
.setGroupId("group-test-savepoint")
// 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
SingleOutputStreamOperator<String> streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
.uid("kafka_source") // 最好设置一下算子的id
.setParallelism(5); // 设置并行度 = topic分区数
// 输出到kafka,此处没有循环,只会产生一条算子链
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.200.130:9092") // 输出topic的kafka集群固定
.setRecordSerializer((KafkaRecordSerializationSchema<String>) (data, context, timestamp) -> {
JSONObject jsonObject = JSONUtil.parseObj(data);
// 获取表名
String table = (String) jsonObject.get("table");
// 获取topic
String topic = hashMap.get(table);
return new ProducerRecord<>(topic, data.getBytes(StandardCharsets.UTF_8));
})
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// sink算子
streamSource.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
}
// 执行
environment.execute();
5. 业务使用场景:
文章来源地址https://www.toymoban.com/news/detail-419653.html
到了这里,关于Flink实现同时消费多个kafka topic,并输出到多个topic的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!