背景
和Spark的使用方式不同,flink结合hudi的方式,是以SPI的方式,所以不需要像使用Spark的方式一样,Spark的方式如下:
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
(这里不包括org.apache.spark.sql.sources.DataSourceRegister)
Flink结合Hudi的方式,只需要引入了对应的jar包即可,以SPI的方式:
META-INF/services/org.apache.flink.table.factories.Factory
org.apache.hudi.table.HoodieTableFactory
org.apache.hudi.table.catalog.HoodieCatalogFactory
其中 HoodieTableFactory 是读写Hudi数据的地方,
HoodieCatalogFactory是操作Hudi用到的Catalog
先说杂谈
直接先解释一下Hudi的写数据:文章来源:https://www.toymoban.com/news/detail-595991.html
HoodieTableFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
"Option [path] should not be empty.");
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}
创建的HoodieTableSink是真正Hudi写入数据的类:文章来源地址https://www.toymoban.com/news/detail-595991.html
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
...
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProviderAdapter) dataStream -> {
// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// set up default parallelism
OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());
RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();
// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
return Pipelines.bulkInsert(conf, rowType, dataStream);
}
// Append mode
if (OptionsResolver.isAppendMode(conf)) {
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
} else {
return Pipelines.dummySink(pipeline);
}
}
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
// write pipeline
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// compaction
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
return Pipelines.compact(conf, pipeline);
} else {
return Pipelines.clean(conf, pipeline);
}
};
}
...
}
-
long ckpTimeout = * 获取到flink的checkpoint的超时时间,并设置为write.commit.ack.timeout的值
和spark*的批处理的不同,flink会有checkpoint的存在(便于flink内部状态的保存和恢复)。 -
OptionsInference.setupSinkTasks
设置write.tasks/write.bucket_assign.tasks/compaction.tasks/clustering.tasks 的值 - 根据设置的write.operation值(默认是upsert),选择不同的代码路径
-
Pipelines.bootstrap bootstrap操作会加载存在的索引,并传递到下游的operator算子中
- 如果index.global.enabled为true(默认为true),则首先构建RowDataToHoodieFunction 算子,把记录转换为HoodieAvroRecord
其中,payload.class默认为EventTimeAvroPayload
write.precombine 为 false,这是为了更好的写性能,这一点和Spark的默认行为一样 - 如果index.bootstrap.enabled为true默认是false,还会构建BootstrapOperator算子,从已有的hoodie表中加载索引,并把索引记录传播到下游(注意每个task只会触发一次)
- 如果index.global.enabled为true(默认为true),则首先构建RowDataToHoodieFunction 算子,把记录转换为HoodieAvroRecord
-
pipeline = Pipelines.hoodieStreamWrite 写hudi文件
- 正如代码中的注释一样,先按照record key做shuffle,再按照fileId做shuffle,最后才是写操作:
| input1 | ===\ /=== | bucket assigner | ===\ /=== | task1 | shuffle(by PK) shuffle(by bucket ID) | input2 | ===/ \=== | bucket assigner | ===/ \=== | task2 |
- 如果index.type不是BUCKET(默认是FLINK_STATE,使用flink state backend作为存储),我们这里主要说明一下非bucket索引:
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf); return dataStream // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) .uid(opUID("bucket_assigner", conf)) .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid(opUID("stream_write", conf)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
- 按照record key进行分组后,主要的操作就是在BucketAssignFunction 这个函数中,
该函数的主要作用就是:- 先根据上一个operator下发的索引记录,更新内存中保存的索引,如果索引存在则更新,否则新增(这里插入对应的I或者U标识)
- 再按照FileId进行分组后,主要的操作 在StreamWriteFunction这个函数中,这里涉及的知识点比较多,后续详细剖析
主要涉及了StreamWriteOperatorCoordinator以及hudi exactly once的实现等
- 按照record key进行分组后,主要的操作就是在BucketAssignFunction 这个函数中,
- 正如代码中的注释一样,先按照record key做shuffle,再按照fileId做shuffle,最后才是写操作:
- 如果compaction.async.enabled为True(默认是True)则进行异步Compaction,否则进行Clean操作
到了这里,关于Apache Hudi初探(一)(与flink的结合)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!