Starrocks扩展FileSystem代码分析

这篇具有很好参考价值的文章主要介绍了Starrocks扩展FileSystem代码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Starrocks扩展FileSystem代码分析

Starrocks支持使用FILES()算子对接文件系统

例如可以使用insert into files("path"="hdfs://xxx.xx.xxx.xx:9000/unload/data1", "format"="parquet", "compression" = "lz4") select * from sales_records实现将表sales_records中的数据导出到HDFS中,使用parquet格式保存。

也可以使用insert into foo select * from files("path"="hdfs://xxx.xx.xxx.xx:9000/unload/data1", "format"="parquet", "compression" = "lz4")实现从HDFS中读取文件然后导入到foo表中。

如果我们想扩展starrocks支持的文件系统,实现从其他文件系统读写文件应该从哪些方面入手呢?

以下我们就以insert into files()语句为例,从starrocks的前后端febe两方面来分析如何扩展其他文件系统。

1 FE解析过程

1.1 FE端到端框架

starrocks有多种连接方式,这里以mysql client连接方式举例

com/starrocks/qe/ConnectProcessor.java

/**
 * Process one mysql connection, receive one pakcet, process, send one packet.
 */
public class ConnectProcessor {
    ...
    // process COM_QUERY statement,
    protected void handleQuery() {
        ...
        originStmt = new String(bytes, 1, ending, StandardCharsets.UTF_8);
        ...
        try {
            ...
            try {
                stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());
            } catch (ParsingException parsingException) {
                throw new AnalysisException(parsingException.getMessage());
            }
            for (int i = 0; i < stmts.size(); ++i) {
                ...
                parsedStmt = stmts.get(i);
                ...
                executor = new StmtExecutor(ctx, parsedStmt);
                ...

                executor.execute();
                ...
            }
        }
        ...
    }
    ...
}

启动starrocks会启动一个starrocks实现的mysql server,查询时查询语句会被分配到ConnectProcessor中,被handleQuery方法执行。这个方法先调用解析器将查询语句的字符串解析成多条语句的语法树。然后对每个语法树构造StmtExecutor,然后调用execute()方法来进行查询引擎前端的语义分析、优化等操作。

com/starrocks/qe/StmtExecutor.java

public class StmtExecutor {
    ...
    public void execute() throws Exception {
        ...
        try {
            ...
            try (Timer ignored = Tracers.watchScope("Total")) {
                ...
                if (!isForwardToLeader()) {
                    ...
                    if {
                        ...
                    } else {
                        execPlan = StatementPlanner.plan(parsedStmt, context);
                        if (parsedStmt instanceof QueryStatement && context.shouldDumpQuery()) {
                            context.getDumpInfo().setExplainInfo(execPlan.getExplainString(TExplainLevel.COSTS));
                        }
                    }
                    ...
                }
            }
            ...
        }
        ...
    }
    ...
    if {
        ...
    } else if (parsedStmt instanceof DmlStmt) {
        handleDMLStmtWithProfile(execPlan, (DmlStmt) parsedStmt);
    } ...
    
}

StmtExecutorexecute()方法中,会将解析后的语法树传入StatementPlannerplan()方法中,这个方法就是将语法树经过分析器和优化器生成执行计划的入口。

生成执行计划后,会在exeucte()方法后续的handleDMLStmtWithProfile()函数中来处理insert这种DML语句的执行过程。在该方法中,会获取调度器然后调用调度器去执行上面的执行计划。

com/starrocks/sql/StatementPlanner.java

public class StatementPlanner {

    public static ExecPlan plan(StatementBase stmt, ConnectContext session) {
        if (session instanceof HttpConnectContext) {
            return plan(stmt, session, TResultSinkType.HTTP_PROTOCAL);
        }
        return plan(stmt, session, TResultSinkType.MYSQL_PROTOCAL);
    }

    public static ExecPlan plan(StatementBase stmt, ConnectContext session,
                                TResultSinkType resultSinkType) {
        ...
        try {
            ...
            try (Timer ignored = Tracers.watchScope("Analyzer")) {
                Analyzer.analyze(stmt, session);
            }

            ...

            if (stmt instanceof QueryStatement) {
                return planQuery(stmt, resultSinkType, session, false);
            } else if (stmt instanceof InsertStmt) {
                return new InsertPlanner().plan((InsertStmt) stmt, session);
            } else if (stmt instanceof UpdateStmt) {
                return new UpdatePlanner().plan((UpdateStmt) stmt, session);
            } else if (stmt instanceof DeleteStmt) {
                return new DeletePlanner().plan((DeleteStmt) stmt, session);
            }
        }
        ...
    }
}

调用StatementPlannerplan()方法时,从以上代码可以看出来,首先会调用分析器Analyzer对语法树进行语义分析,然后对于InsertStmt最后会调用InsertPlannerplan()方法。

com/starrocks/sql/analyzer/Analyzer.java

public class Analyzer {
    private static final Analyzer INSTANCE = new Analyzer(new AnalyzerVisitor());

    public static Analyzer getInstance() {
        return INSTANCE;
    }

    private final AnalyzerVisitor analyzerVisitor;

    private Analyzer(AnalyzerVisitor analyzerVisitor) {
        this.analyzerVisitor = analyzerVisitor;
    }

    public static void analyze(StatementBase statement, ConnectContext context) {
        getInstance().analyzerVisitor.analyze(statement, context);
    }
}

分析器的analyze()方法会调用AnalyzerVisitor,这是一个可以对语法树每个节点进行访问的访问者。

com/starrocks/sql/InsertPlanner.java

public class InsertPlanner {
    ...
    public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) {
        ...
        // 语法树转换成逻辑计划
        try (Timer ignore = Tracers.watchScope("Transform")) {
            logicalPlan = new RelationTransformer(columnRefFactory, session).transform(queryRelation);
        }
        ...
        try (Timer ignore = Tracers.watchScope("InsertPlanner")) {
            ...
            // 优化器执行优化输出物理计划
            OptExpression optimizedPlan;
            try (Timer ignore2 = Tracers.watchScope("Optimizer")) {
                optimizedPlan = optimizer.optimize(
                        session,
                        logicalPlan.getRoot(),
                        requiredPropertySet,
                        new ColumnRefSet(logicalPlan.getOutputColumn()),
                        columnRefFactory);
            }
            ...
            // 将物理计划划分后生成执行计划
            ExecPlan execPlan;
            try (Timer ignore3 = Tracers.watchScope("PlanBuilder")) {
                execPlan = PlanFragmentBuilder.createPhysicalPlan(
                        optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory,
                        queryRelation.getColumnOutputNames(), TResultSinkType.MYSQL_PROTOCAL, hasOutputFragment);
            }
            ...
            // 如果targetTable是TableFunctionTable,就设置执行计划的sink节点为TableFunctionTableSink
            DataSink dataSink;
            if (targetTable instanceof ...) {
                
            } else if (targetTable instanceof TableFunctionTable) {
                dataSink = new TableFunctionTableSink((TableFunctionTable) targetTable);
            }
            ...
            PlanFragment sinkFragment = execPlan.getFragments().get(0);
            ...
            sinkFragment.setSink(dataSink);
        }
        ...
    }
    ...
}

InsertPlanner中,会执行语法树转逻辑计划,逻辑计划优化成物理计划,物理计划划分生成执行计划的过程。

以上是FE端在执行INSERT语句时的整体流程,下面我们详细看一下其中INSERT INTO FILES()语句是如何被解析的。

1.2 FE解析INSERT INTO FILES过程

1.2.1 词法语法分析

com/starrocks/sql/parser/StarRocks.g4

insertStatement
    : explainDesc? INSERT setVarHint* (INTO | OVERWRITE) (qualifiedName | (FILES propertyList)) partitionNames?
        (WITH LABEL label=identifier)? columnAliases?
        (queryStatement | (VALUES expressionsWithDefault (',' expressionsWithDefault)*))
    ;

INSERT (INTO) FILES语法对应在g4语法文件中如图所示,属于insertStatement

com/starrocks/sql/parser/AstBuilder.java

@Override
public ParseNode visitInsertStatement(StarRocksParser.InsertStatementContext context) {
    ...

    // INSERT INTO FILES(...)
    Map<String, String> tableFunctionProperties = getPropertyList(context.propertyList());
    InsertStmt res = new InsertStmt(tableFunctionProperties, queryStatement, createPos(context));
    res.setOptHints(visitVarHints(context.setVarHint()));
    return res;
}

查看Antlr生成的解析器可以看到,解析器将这条语法转化成了InsertStmt语法节点,调用了特定的构造函数。

com/starrocks/sql/ast/InsertStmt.java

public class InsertStmt extends DmlStmt {
    ...
    // Ctor for INSERT INTO FILES(...)
    public InsertStmt(Map<String, String> tableFunctionProperties, QueryStatement queryStatement, NodePosition pos) {
        super(pos);
        this.tblName = new TableName("table_function_catalog", "table_function_db", "table_function_table");
        this.targetColumnNames = null;
        this.targetPartitionNames = null;
        this.queryStatement = queryStatement;
        this.tableFunctionAsTargetTable = true;
        this.tableFunctionProperties = tableFunctionProperties;
    }
    ...
}

调用了InsertStmt中这个构造函数

1.2.2 语义分析

com/starrocks/sql/ast/AstVisitor.java

com/starrocks/sql/analyzer/AnalyzerVisitor.java

public class AnalyzerVisitor extends AstVisitor<Void, ConnectContext> {
    public void analyze(StatementBase statement, ConnectContext session) {
        visit(statement, session);
    }
    ...
    @Override
    public Void visitInsertStatement(InsertStmt statement, ConnectContext session) {
        InsertAnalyzer.analyze(statement, session);
        return null;
    }
    ....
}

经过解析器生成语法树后,需要经过分析器进行语义分析,分析器使用访问者模式,AnalyzerVisitor通过继承AstVisitor实现了访问语法树上的每个节点。

com/starrocks/sql/analyzer/InsertAnalyzer.java

public class InsertAnalyzer {
    public static void analyze(InsertStmt insertStmt, ConnectContext session) {
        QueryRelation query = insertStmt.getQueryStatement().getQueryRelation();
        new QueryAnalyzer(session).analyze(insertStmt.getQueryStatement());

        List<Table> tables = new ArrayList<>();
        AnalyzerUtils.collectSpecifyExternalTables(insertStmt.getQueryStatement(), tables, Table::isHiveTable);
        tables.stream().map(table -> (HiveTable) table)
                .forEach(table -> table.useMetadataCache(false));

        /*
         *  Target table
         */
        Table table = getTargetTable(insertStmt, session);
        ...
        insertStmt.setTargetTable(table);
        insertStmt.setTargetColumns(targetColumns);
        if (session.getDumpInfo() != null) {
            session.getDumpInfo().addTable(insertStmt.getTableName().getDb(), table);
        }
    }
    ...
    private static Table getTargetTable(InsertStmt insertStmt, ConnectContext session) {
        if (insertStmt.useTableFunctionAsTargetTable()) {
            return insertStmt.makeTableFunctionTable();
        }
        ...
    }
    ...
}

访问InsertStmt节点时,调用了InsertAnalyzer.analyze方法对insert语法进行语义分析,其中getTargetTable会根据insert语句分析并生成对应类型的语义Table。最后将生成的语义Table赋值给了语法树。

getTargetTable开始先判断insertStmt是否是useTableFunctionAsTargetTable,从InsertStmt的构造函数可以看出,INSERT (INTO) FILES语句对应的tableFunctionAsTargetTabletrue。因此继续调用makeTableFunctionTable来生成Table

com/starrocks/sql/ast/InsertStmt.java

public class InsertStmt extends DmlStmt {
    ...
    public Table makeTableFunctionTable() {
        ...
        // parse table function properties
        Map<String, String> props = getTableFunctionProperties();
        String single = props.getOrDefault("single", "false");
        if (!single.equalsIgnoreCase("true") && !single.equalsIgnoreCase("false")) {
            throw new SemanticException("got invalid parameter \"single\" = \"%s\", expect a boolean value (true or false).",
                    single);
        }

        boolean writeSingleFile = single.equalsIgnoreCase("true");
        String path = props.get("path");
        String format = props.get("format");
        String partitionBy = props.get("partition_by");
        String compressionType = props.get("compression");
        ...
        if (writeSingleFile) {
            return new TableFunctionTable(path, format, compressionType, columns, null, true, props);
        }

        if (partitionBy == null) {
            // prepend `data_` if path ends with forward slash
            if (path.endsWith("/")) {
                path += "data_";
            }
            return new TableFunctionTable(path, format, compressionType, columns, null, false, props);
        }
        
        ...

        return new TableFunctionTable(path, format, compressionType, columns, partitionColumnIDs, false, props);
    }
}

makeTableFunctionTable中,解析了FILES(...)算子传入的参数,然后根据传入的参数调用了TableFunctionTable的构造函数,将语法节点转换成了语义节点。

分析器结束后就会给InsertStmt中赋值targetTable,这个表就表示INSERT INTO会将数据插入targetTable中。

1.2.3 生成执行计划

根据上文InsertPlannerplan()方法中的步骤,分析器执行完成后就生成了targetTable,后续的转换和优化过程只是针对INSERT INTO FILES() SELECT ...后面的查询语句,分析器生成的targetTable会在最后转换成TableFunctionTableSink算子赋值给执行计划。

com/starrocks/planner/PlanFragment.java

public class PlanFragment extends TreeNode<PlanFragment> {
    ...
    public TPlanFragment toThrift() {
        TPlanFragment result = new TPlanFragment();
        ...
        if (sink != null) {
            result.setOutput_sink(sink.toThrift());
        }
        ...
        return result;
    }
    ...
}

InsertPlanner最后生成的执行计划就是由PlanFragment组成的,这个对象会通过thrift被发送给BE。通过这里的toThrift()方法可以看出,如果存在sink节点,则调用sink节点的toThrift()方法,然后将其赋值给thrift对象TPlanFragmentoutput_sink

com/starrocks/planner/TableFunctionTableSink.java

public class TableFunctionTableSink extends DataSink {
    ...
    @Override
    protected TDataSink toThrift() {
        TTableFunctionTableSink tTableFunctionTableSink = new TTableFunctionTableSink();
        tTableFunctionTableSink.setTarget_table(table.toTTableFunctionTable());
        TCloudConfiguration tCloudConfiguration = new TCloudConfiguration();
        cloudConfiguration.toThrift(tCloudConfiguration);
        tTableFunctionTableSink.setCloud_configuration(tCloudConfiguration);
        // 设置Sink类型为TABLE_FUNCTION_TABLE_SINK
        TDataSink tDataSink = new TDataSink(TDataSinkType.TABLE_FUNCTION_TABLE_SINK);
        tDataSink.setTable_function_table_sink(tTableFunctionTableSink);
        return tDataSink;
    }
    ...
}

以上sinktoThrift()方法就会调用到TableFunctionTableSinktoThrift()方法。其中将TDataSink的类型设置成了TABLE_FUNCTION_TABLE_SINK

1.2.4 调度器转发

com/starrocks/rpc/PBackendService.java

public interface PBackendService {
    @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment",
            attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 60000)
    Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request);
    ...
}

调度器通过一系列函数调用(省略中间过程,可通过这个方法向上追溯调用链,也可以通过调度器的exec方法向下追溯调用链),最终会通过调用execPlanFragmentAsync这个RPC将执行计划发送到BE。

1.3 BE执行TDataSink算子的过程

1.3.1 RPC入口

src/service/internal_service.h

template <typename T>
class PInternalServiceImplBase : public T {
public:
    ...
    void exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request,
                            PExecPlanFragmentResult* result, google::protobuf::Closure* done) override;
    ...
}

以上RPC到BE端对应的函数就是PInternalServiceImplBase中的exec_plan_fragment()方法。

src/service/internal_service.cpp

template <typename T>
Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline(const TExecPlanFragmentParams& t_common_param,
                                                                    const TExecPlanFragmentParams& t_unique_request) {
    pipeline::FragmentExecutor fragment_executor;
    auto status = fragment_executor.prepare(_exec_env, t_common_param, t_unique_request);
    if (status.ok()) {
        return fragment_executor.execute(_exec_env);
    } else {
        return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
    }
}

以上exec_plan_fragment()最终会调用到_exec_plan_fragment_by_pipeline()这个私有方法中,这个方法初始化了一个FragmentExecutor,这就是BE去运行执行计划的执行器了。执行器在这里调用了prepare()进行相关准备工作,然后调用execute()方法运行整个计划。

1.3.2 准备BE执行计划

src/exec/pipeline/fragment_executor.cpp

Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParams& common_request,
                                 const TExecPlanFragmentParams& unique_request) {
    ...
    {
        SCOPED_RAW_TIMER(&profiler.prepare_runtime_state_time);
        RETURN_IF_ERROR(_prepare_workgroup(request));
        RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
        // thrift对象转成BE执行计划树
        RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
        RETURN_IF_ERROR(_prepare_global_dict(request));
    }
    {
        SCOPED_RAW_TIMER(&profiler.prepare_pipeline_driver_time);
        // 准备pipeline driver,解析sink节点
        RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
        RETURN_IF_ERROR(_prepare_stream_load_pipe(exec_env, request));
    }
    ...
}
Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
    ...
    std::unique_ptr<DataSink> datasink;
    if (request.isset_output_sink()) {
        const auto& tsink = request.output_sink();
        ...
        RETURN_IF_ERROR(DataSink::create_data_sink(runtime_state, tsink, fragment.output_exprs, params,
                                                   request.sender_id(), plan->row_desc(), &datasink));
        // 将fe的sink节点转换成BE的TableFunctionTableSinkOperatorFactory
        RETURN_IF_ERROR(_decompose_data_sink_to_operator(runtime_state, &context, request, datasink, tsink,
                                                         fragment.output_exprs));
    }
    ...
    // 这里将调用pipeline里面所有factory生成真正的BE operator,在这里就会生成TableFunctionTableSinkOperator
    if (!unready_pipeline_groups.empty()) {
        RETURN_IF_ERROR(create_lazy_instantiate_drivers_pipeline(
                runtime_state, &context, _query_ctx, _fragment_ctx.get(), std::move(unready_pipeline_groups), drivers));
    }
    ...
}
Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_state, PipelineBuilderContext* context,
                                                          const UnifiedExecPlanFragmentParams& request,
                                                          std::unique_ptr<starrocks::DataSink>& datasink,
                                                          const TDataSink& thrift_sink,
                                                          const std::vector<TExpr>& output_exprs) {
    ...
    if (typeid(*datasink) == ...) {
        ...
    } else if (typeid(*datasink) == typeid(starrocks::TableFunctionTableSink)) {
        ...
        auto op = std::make_shared<TableFunctionTableSinkOperatorFactory>(
                context->next_operator_id(), target_table.path, target_table.file_format, target_table.compression_type,
                output_expr_ctxs, partition_expr_ctxs, column_names, partition_column_names,
                target_table.write_single_file, thrift_sink.table_function_table_sink.cloud_configuration,
                fragment_ctx);
        ...
    }
}

以上代码展示了Sink节点在BE的转换过程。

经过转换,最终生成了BE的TableFunctionTableSinkOperator

src/exec/pipeline/sink/table_function_table_sink_operator.cpp

Status TableFunctionTableSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
    if (_partition_exprs.empty()) {
        if (_partition_writers.empty()) {
            auto writer = std::make_unique<RollingAsyncParquetWriter>(_make_table_info(_path), _output_exprs,
                                                                      _common_metrics.get(), add_commit_info, state,
                                                                      _driver_sequence);
            RETURN_IF_ERROR(writer->init());
            _partition_writers.insert({"default writer", std::move(writer)});
        }
        return _partition_writers["default writer"]->append_chunk(chunk.get(), state);
    }
    ...
    return _partition_writers[partition_location]->append_chunk(chunk.get(), state);
}

pipeline执行时,对于TableFunctionTableSinkOperator调用push_chunk()方法向这个operator中写入数据(chunk表示一个数据块)。可以看到这里使用了RollingAsyncParquetWriter,首先调用了init()方法初始化writer,最后调用writer的append_chunk()方法将数据块写入。

1.3.3 ParquetWriter初始化

src/exec/parquet_writer.cpp

Status RollingAsyncParquetWriter::init() {
    ASSIGN_OR_RETURN(
            _fs, FileSystem::CreateUniqueFromString(_table_info.partition_location, FSOptions(&_table_info.cloud_conf)))
    _schema = _table_info.schema;
    _partition_location = _table_info.partition_location;

    ::parquet::WriterProperties::Builder builder;
    _table_info.enable_dictionary ? builder.enable_dictionary() : builder.disable_dictionary();
    ASSIGN_OR_RETURN(auto compression_codec,
                     parquet::ParquetBuildHelper::convert_compression_type(_table_info.compress_type));
    builder.compression(compression_codec);
    builder.version(::parquet::ParquetVersion::PARQUET_2_0);
    _properties = builder.build();

    return Status::OK();
}

init()方法中首先调用了FileSystem::CreateUniqueFromString()方法进行fs文件系统初始化,然后初始化了写入parquet的一些配置WriterProperties

src/fs/fs.cpp

StatusOr<std::unique_ptr<FileSystem>> FileSystem::CreateUniqueFromString(std::string_view uri, FSOptions options) {
    if (fs::is_posix_uri(uri)) {
        return new_fs_posix();
    }
    if (fs::is_s3_uri(uri)) {
        return new_fs_s3(options);
    }
    if (fs::is_azure_uri(uri) || fs::is_gcs_uri(uri)) {
        // TODO(SmithCruise):
        // Now Azure storage and Google Cloud Storage both are using LibHdfs, we can use cpp sdk instead in the future.
        return new_fs_hdfs(options);
    }
#ifdef USE_STAROS
    if (is_starlet_uri(uri)) {
        return new_fs_starlet();
    }
#endif
    // Since almost all famous storage are compatible with Hadoop FileSystem, it's always a choice to fallback using
    // Hadoop FileSystem to access storage.
    return new_fs_hdfs(options);
}

初始化文件系统会检查FILES()传入的path参数,path在这个函数中是uri参数,这里会判断uri是否是posix本地路径,是否是s3, azure格式,如果都不是会返回hdfs文件系统。

要扩展其他文件系统,这里需要定义一种uri的格式,然后在这里判断是否是该文件系统对应的格式,如果是,则构造一个自定义的FileSystem

src/fs/fs.h

class FileSystem {
public:
    enum Type { POSIX, S3, HDFS, BROKER, MEMORY, STARLET };

    // Governs if/how the file is created.
    //
    // enum value                   | file exists       | file does not exist
    // -----------------------------+-------------------+--------------------
    // CREATE_OR_OPEN_WITH_TRUNCATE | opens + truncates | creates
    // CREATE_OR_OPEN               | opens             | creates
    // MUST_CREATE                  | fails             | creates
    // MUST_EXIST                   | opens             | fails
    enum OpenMode { CREATE_OR_OPEN_WITH_TRUNCATE, CREATE_OR_OPEN, MUST_CREATE, MUST_EXIST };

    ...

    // Create a brand new sequentially-readable file with the specified name.
    //  If the file does not exist, returns a non-OK status.
    //
    // The returned file will only be accessed by one thread at a time.
    StatusOr<std::unique_ptr<SequentialFile>> new_sequential_file(const std::string& fname) {
        return new_sequential_file(SequentialFileOptions(), fname);
    }

    virtual StatusOr<std::unique_ptr<SequentialFile>> new_sequential_file(const SequentialFileOptions& opts,
                                                                          const std::string& fname) = 0;

    // Create a brand new random access read-only file with the
    // specified name.
    //
    // The returned file will only be accessed by one thread at a time.
    StatusOr<std::unique_ptr<RandomAccessFile>> new_random_access_file(const std::string& fname) {
        return new_random_access_file(RandomAccessFileOptions(), fname);
    }

    virtual StatusOr<std::unique_ptr<RandomAccessFile>> new_random_access_file(const RandomAccessFileOptions& opts,
                                                                               const std::string& fname) = 0;

    // Create an object that writes to a new file with the specified
    // name.  Deletes any existing file with the same name and creates a
    // new file.
    //
    // The returned file will only be accessed by one thread at a time.
    virtual StatusOr<std::unique_ptr<WritableFile>> new_writable_file(const std::string& fname) = 0;

    // Like the previous new_writable_file, but allows options to be
    // specified.
    virtual StatusOr<std::unique_ptr<WritableFile>> new_writable_file(const WritableFileOptions& opts,
                                                                      const std::string& fname) = 0;

    // Returns OK if the path exists.
    //         NotFound if the named file does not exist,
    //                  the calling process does not have permission to determine
    //                  whether this file exists, or if the path is invalid.
    //         IOError if an IO Error was encountered
    virtual Status path_exists(const std::string& fname) = 0;

    // Store in *result the names of the children of the specified directory.
    // The names are relative to "dir".
    // Original contents of *results are dropped.
    // Returns OK if "dir" exists and "*result" contains its children.
    //         NotFound if "dir" does not exist, the calling process does not have
    //                  permission to access "dir", or if "dir" is invalid.
    //         IOError if an IO Error was encountered
    virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0;

    // Iterate the specified directory and call given callback function with child's
    // name. This function continues execution until all children have been iterated
    // or callback function return false.
    // The names are relative to "dir".
    //
    // The function call extra cost is acceptable. Compared with returning all children
    // into a given vector, the performance of this method is 5% worse. However this
    // approach is more flexiable and efficient in fulfilling other requirements.
    //
    // Returns OK if "dir" exists.
    //         NotFound if "dir" does not exist, the calling process does not have
    //                  permission to access "dir", or if "dir" is invalid.
    //         IOError if an IO Error was encountered
    virtual Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) = 0;

    // `iterate_dir2` is similar to `iterate_dir` but in addition to returning the directory entry name, it
    // also returns some file statistics.
    virtual Status iterate_dir2(const std::string& dir, const std::function<bool(DirEntry)>& cb) = 0;

    // Delete the named file.
    // FIXME: If the named file does not exist, OK or NOT_FOUND is returned, depend on the implementation.
    virtual Status delete_file(const std::string& fname) = 0;

    // Create the specified directory.
    // NOTE: It will return error if the path already exist(not necessarily as a directory)
    virtual Status create_dir(const std::string& dirname) = 0;

    // Creates directory if missing.
    // Return OK if it exists, or successful in Creating.
    virtual Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) = 0;

    // Create directory for every element of 'dirname' that does not already exist.
    // If 'dirname' already exists, the function does nothing (this condition is not treated as an error).
    virtual Status create_dir_recursive(const std::string& dirname) = 0;

    // Delete the specified directory.
    // NOTE: The dir must be empty.
    virtual Status delete_dir(const std::string& dirname) = 0;

    // Deletes the contents of 'dirname' (if it is a directory) and the contents of all its subdirectories,
    // recursively, then deletes 'dirname' itself. Symlinks are not followed (symlink is removed, not its target).
    virtual Status delete_dir_recursive(const std::string& dirname) = 0;

    // Synchronize the entry for a specific directory.
    virtual Status sync_dir(const std::string& dirname) = 0;

    // Checks if the file is a directory. Returns an error if it doesn't
    // exist, otherwise return true or false.
    virtual StatusOr<bool> is_directory(const std::string& path) = 0;

    // Canonicalize 'path' by applying the following conversions:
    // - Converts a relative path into an absolute one using the cwd.
    // - Converts '.' and '..' references.
    // - Resolves all symbolic links.
    //
    // All directory entries in 'path' must exist on the filesystem.
    virtual Status canonicalize(const std::string& path, std::string* result) = 0;

    virtual StatusOr<uint64_t> get_file_size(const std::string& fname) = 0;

    // Get the last modification time by given 'fname'.
    virtual StatusOr<uint64_t> get_file_modified_time(const std::string& fname) = 0;

    // Rename file src to target.
    virtual Status rename_file(const std::string& src, const std::string& target) = 0;

    // create a hard-link
    virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0;

    // Determines the information about the filesystem on which the pathname 'path' is located.
    virtual StatusOr<SpaceInfo> space(const std::string& path) { return Status::NotSupported("FileSystem::space()"); }

    // Given the path to a remote file, delete the file's cache on the local file system, if any.
    // On success, Status::OK is returned. If there is no cache, Status::NotFound is returned.
    virtual Status drop_local_cache(const std::string& path) { return Status::NotFound(path); }

    // Batch delete the given files.
    // return ok if all success (not found error ignored), error if any failed and the message indicates the fail message
    // possibly stop at the first error if is simulating batch deletes.
    virtual Status delete_files(const std::vector<std::string>& paths) {
        for (auto&& path : paths) {
            auto st = delete_file(path);
            if (!st.ok() && !st.is_not_found()) {
                return st;
            }
        }
        return Status::OK();
    }
};

starrocks的FileSystem可以扩展实现自己的xxxFileSystem,需要实现的接口就是上面这个基类的方法。主要有:

  • 创建顺序读的文件
  • 创建随机读的文件
  • 创建可写的文件
  • 一系列路径操作
  • 获取文件信息
  • 删除文件

如果可以扩展实现自己的xxxFileSystem,那么就可以调用new_writable_file()创建一个可写文件,然后就可以使用arrow::parquet::ParquetFileWriter写入文件了。

1.3.4 ParquetWriter写入数据

src/exec/parquet_writer.cpp

Status RollingAsyncParquetWriter::append_chunk(Chunk* chunk, RuntimeState* state) {
    RETURN_IF_ERROR(get_io_status());

    if (_writer == nullptr) {
        RETURN_IF_ERROR(_new_file_writer());
    }
    // exceed file size
    if (_max_file_size != -1 && _writer->file_size() > _max_file_size) {
        RETURN_IF_ERROR(close_current_writer(state));
        RETURN_IF_ERROR(_new_file_writer());
    }
    return _writer->write(chunk);
}
Status RollingAsyncParquetWriter::_new_file_writer() {
    std::string new_file_location = _new_file_location();
    WritableFileOptions options{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
    ASSIGN_OR_RETURN(auto writable_file, _fs->new_writable_file(options, new_file_location))
    _writer = std::make_shared<starrocks::parquet::AsyncFileWriter>(
            std::move(writable_file), new_file_location, _partition_location, _properties, _schema, _output_expr_ctxs,
            ExecEnv::GetInstance()->pipeline_sink_io_pool(), _parent_profile, _max_file_size);
    auto st = _writer->init();
    return st;
}

append_chunk()方法会初始化一个AsyncFileWriter,然后调用它的write()方法写入数据。

src/formats/parquet/file_writer.cpp

Status FileWriterBase::init() {
    _writer = ::parquet::ParquetFileWriter::Open(_outstream, _schema, _properties);
    if (_writer == nullptr) {
        return Status::InternalError("Failed to create file writer");
    }
    return Status::OK();
}

void FileWriterBase::_generate_chunk_writer() {
    DCHECK(_writer != nullptr);
    if (_chunk_writer == nullptr) {
        auto rg_writer = _writer->AppendBufferedRowGroup();
        _chunk_writer = std::make_unique<ChunkWriter>(rg_writer, _type_descs, _schema, _eval_func);
    }
}

Status FileWriterBase::write(Chunk* chunk) {
    if (!chunk->has_rows()) {
        return Status::OK();
    }

    _generate_chunk_writer();
    RETURN_IF_ERROR(_chunk_writer->write(chunk));

    if (_chunk_writer->estimated_buffered_bytes() > _max_row_group_size && !is_last_row_group()) {
        RETURN_IF_ERROR(_flush_row_group());
    }

    return Status::OK();
}

FileWriterBaseAsyncFileWriter的基类,调用AsyncFileWriterwrite()方法如上,可以看到其实本质上就是调用了parquet::ParquetFileWriterwrite()方法。文章来源地址https://www.toymoban.com/news/detail-825191.html

到了这里,关于Starrocks扩展FileSystem代码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 驶向高效运营,StarRocks 助力蔚来汽车数据分析再升级

    作者:蔚来汽车数字化业务发展部大数据团队 小编导读: 蔚来汽车是一家全球化的 智能电动汽车公司,是高端智能汽车市场的先驱及领跑者。蔚来致力于通过提供高性能的智能电动汽车与极致用户体验,为用户创造愉悦的生活方式。 为了提升内部大数据分析的效率,蔚来陆

    2024年02月09日
    浏览(53)
  • vivo 基于 StarRocks 构建实时大数据分析平台,为业务搭建数据桥梁

    在大数据时代,数据分析和处理能力对于企业的决策和发展至关重要。 vivo 作为一家全球移动互联网智能终端公司,需要基于移动终端的制造、物流、销售等各个方面的数据进行分析以满足业务决策。 而随着公司数字化服务的演进,业务诉求和技术架构有了新的调整,已有的

    2024年02月22日
    浏览(53)
  • 数据仓库系列:StarRocks 下一代高性能分析数据仓库的架构、数据存储及表设计

    本文是学习StarRocks的读书笔记,让你快速理解下一代高性能分析数据仓库的架构、数据存储及表设计。 StarRocks的架构相对简单。 整个系统只包含两种类型的组件,前端(FE)和后端(BE),StarRocks不依赖任何外部组件,简化了部署和维护。 FE和BE可以在不停机的情况下横向扩展。

    2024年02月16日
    浏览(64)
  • libuv库学习笔记-filesystem

    简单的文件读写是通过 uv_fs_* 函数族和与之相关的 uv_fs_t 结构体完成的。 note libuv 提供的文件操作和 socket operations 并不相同。套接字操作使用了操作系统本身提供了非阻塞操作,而文件操作内部使用了阻塞函数,但是 libuv 是在线程池中调用这些函数,并在应用程序需要交互

    2024年02月15日
    浏览(24)
  • HDFS FileSystem 导致的内存泄露

                                    感谢点赞和关注 ,每天进步一点点!加油! 目录 一、问题描述 二、问题定位和源码分析 ftp程序读取windows本地文件写入HDFS,5天左右程序 重启一次,怀疑是为OOM挂掉,马上想着就分析 GC日志了。 程序分配内存 1024M ,从gc日志可以看出,

    2024年02月02日
    浏览(83)
  • hadoop FileSystem是否要close

    先来说结论,最好不要close,让hadoop自己close,否则容易把进程里其他获取fs的地方一起关了。这是因为 FileSystem.get(Configuration)做了缓存的原因。当然可以设置 conf.setBoolean(\\\"fs.hdfs.impl.disable.cache\\\", true); 就不缓存fs,但是这可能会导致性能问题,因为每个获取都要建立一个和name

    2024年02月21日
    浏览(25)
  • C++17 文件与目录操作 <filesystem>

    目录 路径操作 目录遍历 文件检查和操作 总结 每次写C++进行目录操作时,我一般都是调平台的SDK,尤其是win32 api 非常难记,于是查一下文档看看有没有和Python中os模块一样好用的库。 于是发现 filesystem,从来没用过(我的第六版C++ primer 最新标准只介绍了C++11) 用法整理如下

    2024年02月13日
    浏览(26)
  • 深入解析 Go 语言中的 http.FileSystem

    本篇博文将深入研究 Go 语言中的 http.FileSystem 接口,这是在构建 Web 应用程序时至关重要的一部分。通过对 http.FileSystem 接口的深入探讨,我们将了解其基本原理、使用方法以及实际应用场景。 首先,我们将介绍 http.FileSystem 的基本概念和作用,以便读者对其有一个清晰的认识

    2024年03月19日
    浏览(38)
  • 云计算的可扩展性分析:如何实现云计算的可扩展性

    云计算是一种基于互联网的计算资源共享和分配模式,它可以让用户在需要时轻松地获取计算资源,从而实现资源的高效利用。随着云计算技术的不断发展,云计算的可扩展性成为了一个重要的研究热点。在本文中,我们将从多个角度来分析云计算的可扩展性,并提出一些实

    2024年04月26日
    浏览(43)
  • 等级保护安全扩展要求测评对象分析汇总

    (1)安全扩展要求建议新增测评对象 新技术新应用 建议补充测评对象 云计算 -虚拟设备(包括虚拟机、虚拟网络设备、虚拟安全设备等) -云操作系统、云业务管理平台、虚拟机监视器 -云租户网络控制器 -云应用开发平台 移动互联 -无线接入设备工作环境 -移动终端、移动

    2024年04月13日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包