FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

这篇具有很好参考价值的文章主要介绍了FlinkSQL-- sql-client及源码解析 -- flink-1.13.6。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前言

本文基于flink-1.13.6

SQL Client: Init scripts and Statement Sets

这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能就是支持了 -i 命令用来初始化脚本,-f 命令用来执行 SQL 语句之前的 YAML 文件这个版本不再支持了,相反更多的是通过 SQL 脚本的方式来配置会话和提交任务.

类似于下面这种方式:

sql-client.sh -i init.sql -f test.sql

1.1、 -i 初始化 SQL Client

SET execution.runtime-mode=batch;
SET sql-client.execution.result-mode=TABLEAU;
SET pipeline.name=batch_demo

init.sql 初始化脚本文件支持的功能还非常多,我这里就简单的设置了几个,更多的属性可以参考官网.

使用 -i <init.sql> 选项初始化 SQL Client 会话时,初始化 SQL 文件中允许以下语句:

DDL(CREATE/DROP/ALTER),
USE CATALOG/DATABASE,
LOAD/UNLOAD MODULE,
SET command,
RESET command.

1.2、-f SQL脚本

create table rate_history ( 
 currency STRING, 
 conversion_rate DECIMAL(32, 2), 
 update_time TIMESTAMP(3),
 WATERMARK FOR update_time AS update_time  
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/ratesHistory.csv', 
 'format.type' = 'csv' 
);

CREATE TABLE printb
(
  num bigint
)
WITH ('connector' = 'print');

-- 两条sql语句
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;

执行:

./bin/sql-client.sh -i test/init.sql -f test/batch.sql 

FlinkSQL-- sql-client及源码解析 -- flink-1.13.6
查看flink web 页面发现两个job
FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

SQL Client 将每个 INSERT INTO 语句作为单个 Flink 作业执行。但是,由于管道的某些部分可以重复使用,因此有时不是最佳选择。

SQL Client 支持 STATEMENT SET 语法来执行一组 SQL 语句。这是 Table API 中StatementSet 的等效功能。STATEMENT SET 语法包含一个或多个 INSERT INTO 语句。全面优化了STATEMENT SET 块中的所有语句,并将其作为单个 Flink 作业执行。联合优化和执行允许重用常见的中间结果,因此可以显着提高执行多个查询的效率。

STATEMENT SET 的语法格式如下:

BEGIN STATEMENT SET;
  -- one or more INSERT INTO statements
  { INSERT INTO|OVERWRITE <select_statement>; }+
END;

-- 修改上面的sql脚本
-- 两条sql语句
BEGIN STATEMENT SET;
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;
END;

FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

.接下来就来看一下底层源码是怎么实现的.

二、源码分析

2.1、从sql-client.sh 找到执行的入口类是 org.apache.flink.table.client.SqlClient

然后来看下 SqlClient 对象属性源码如下:

public class SqlClient { 
    // 标记是否是 embedded 模式
    private final boolean isEmbedded;
    // 提交命令选项
    private final CliOptions options;
    // 用来返回结果的
    private final Supplier<Terminal> terminalFactory;
    // 目前只支持 embedded
    public static final String MODE_EMBEDDED = "embedded";
    public static final String MODE_GATEWAY = "gateway";
	// ...
}

2.2、接着来看 SqlClient 的 main 方法,也就是程序的入口

main 方法里面调用的是 startClient 方法,所以直接来看 startClient 方法的源码:

@VisibleForTesting
protected static void startClient(String[] args, Supplier<Terminal> terminalFactory) {
    final String mode;
    final String[] modeArgs;
    // 设置启动模式默认是 embedded
    if (args.length < 1 || args[0].startsWith("-")) {
        // mode is not specified, use the default `embedded` mode
        mode = MODE_EMBEDDED;
        modeArgs = args;
    } else {
        // mode is specified, extract the mode value and reaming args
        mode = args[0];
        // remove mode
        modeArgs = Arrays.copyOfRange(args, 1, args.length);
    }
    
    switch (mode) {
        case MODE_EMBEDDED:
            // 解析提交命令里的参数
            final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
            // 打印参数说明
            if (options.isPrintHelp()) {
                CliOptionsParser.printHelpEmbeddedModeClient();
            } else {
                try {
                    // 构建 SqlClient 对象
                    final SqlClient client = new SqlClient(true, options, terminalFactory);
                    client.start();
                } catch (SqlClientException e) {
                    //...
                }
            }
            break;
        case MODE_GATEWAY:
        	// gateway 模式暂时不支持
            throw new SqlClientException("Gateway mode is not supported yet.");
        default:
            CliOptionsParser.printHelpClient();
    }
}

2.2.1、解析参数

调用 parseEmbeddedModeClient 方法解析提交命令里面的各种参数.包括我们上面用到的 -i 和 -f 都是在这一步解析并赋值的.

    public static CliOptions parseEmbeddedModeClient(String[] args) {
        try {
            DefaultParser parser = new DefaultParser();
            CommandLine line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true);
            return new CliOptions(
                    line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
                    checkSessionId(line),
                    // 解析 -i  初始化文件
                    checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
                    // 解析 -f sql脚本
                    checkUrl(line, CliOptionsParser.OPTION_FILE),
                    checkUrls(line, CliOptionsParser.OPTION_JAR),
                    checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
                    line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
                    line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
                    getPythonConfiguration(line));
        } catch (ParseException e) {
            throw new SqlClientException(e.getMessage());
        }
    }
    public static final Option OPTION_INIT_FILE =
            Option.builder("i")
                    .required(false)
                    .longOpt("init")
                    .numberOfArgs(1)
                    .argName("initialization file")
                    .desc(
                            "Script file that used to init the session context. "
                                    + "If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file.")
                    .build();

    public static final Option OPTION_FILE =
            Option.builder("f")
                    .required(false)
                    .longOpt("file")
                    .numberOfArgs(1)
                    .argName("script file")
                    .desc(
                            "Script file that should be executed. In this mode, "
                                    + "the client will not open an interactive terminal.")
                    .build();

2.2.2、构建 SqlClient

final SqlClient client = new SqlClient(true, options, terminalFactory);

2.2.3、启动 SqlClient

client.start();

private void start() {
        if (isEmbedded) {
            // create local executor with default environment
            DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
            // 创建一个 LocalExecutor 对象,用于本地执行程序
            final Executor executor = new LocalExecutor(defaultContext);
            executor.start();

            // Open an new session
            String sessionId = executor.openSession(options.getSessionId());
            try {
                // add shutdown hook
                Runtime.getRuntime()
                        .addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));

                // do the actual work  真正执行 SQL 的地方
                openCli(sessionId, executor);
            } finally {
                executor.closeSession(sessionId);
            }
        }
        else {
            throw new SqlClientException("Gateway mode is not supported yet.");
        }
    }
2.2.4、真正执行 SQL 的地方是 openCli 方法
  /**
     * Opens the CLI client for executing SQL statements.
     *
     * @param sessionId session identifier for the current client.
     * @param executor executor
     */
    private void openCli(String sessionId, Executor executor) {
        Path historyFilePath;
        if (options.getHistoryFilePath() != null) {
            historyFilePath = Paths.get(options.getHistoryFilePath());
        } else {
            historyFilePath =
                    Paths.get(
                            System.getProperty("user.home"),
                            SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
        }

        boolean hasSqlFile = options.getSqlFile() != null;
        boolean hasUpdateStatement = options.getUpdateStatement() != null;
        if (hasSqlFile && hasUpdateStatement) {
            throw new IllegalArgumentException(//...
        }

        try (CliClient cli = new CliClient(terminalFactory, sessionId, executor, historyFilePath)) {
        	// 执行初始化 SQL -i 参数
            if (options.getInitFile() != null) {
                boolean success = cli.executeInitialization(readFromURL(options.getInitFile()));
                if (!success) { // ...}
            }

            if (!hasSqlFile && !hasUpdateStatement) {
                cli.executeInInteractiveMode();
            } else {
            	// 执行真正的 SQL 文件 -f
                cli.executeInNonInteractiveMode(readExecutionContent());
            }
        }
    }

这个里面会先获取 historyFilePath 的路径,然后判断是否存在 -i -f 这两个文件,如果有的话会先调用 executeInitialization 执行初始化的脚本.实际调用的是 executeInitialization#executeFile 方法来执行脚本,executeFile 的源码如下:

private boolean executeFile(String content, ExecutionMode mode) {
    terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EXECUTE_FILE).toAnsi());
    for (String statement : CliStatementSplitter.splitContent(content)) {
        terminal.writer()
                .println(
                        new AttributedString(String.format("%s%s", prompt, statement))
                                .toString());
        terminal.flush();
        // 执行 
        if (!executeStatement(statement, mode)) {
            // cancel execution when meet error or ctrl + C;
            return false;
        }
    }
    return true;
}

其实不管是 -i 还是 -f 最终都会调用 executeFile 这个方法去解析脚本里的内容并且执行,这里方法里面先调用 splitContent 方法去做解析.

public static List<String> splitContent(String content) {
    List<String> statements = new ArrayList<>();
    List<String> buffer = new ArrayList<>();
    for (String line : content.split("\n")) {
        if (isEndOfStatement(line)) {
            buffer.add(line);
            statements.add(String.join("\n", buffer));
            buffer.clear();
        } else {
            buffer.add(line);
        }
    }
    if (!buffer.isEmpty()) {
        statements.add(String.join("\n", buffer));
    }
    return statements;
}
private static boolean isEndOfStatement(String line) {
    return line.replaceAll(MASK, "").trim().endsWith(";");
}

其实就是一行一行的读取初始化脚本和 SQL 脚本里面的内容,然后放到一个 List 里面.然后循环这个 List 调用 executeStatement 方法去执行 SQL 脚本.

// 执行 SQL 脚本.
private boolean executeStatement(String statement, ExecutionMode executionMode) {
    try {
        final Optional<Operation> operation = parseCommand(statement);
        operation.ifPresent(op -> callOperation(op, executionMode));
    } catch (SqlExecutionException e) {
        printExecutionException(e);
        return false;
    }
    return true;
}

执行之前会先对 SQL 做一个清洗,具体逻辑在 parseCommand 方法中.

// 其实就是把 SQL 后面的 ; 去掉,并在遇到 bad case 的时候返回空.然后调用 parseStatement 方法将 SQL 语句解析成 Operation,后面的过程就跟 Flink SQL 翻译成代码的过程差不多.就不在往后面跟了.
private Optional<Operation> parseCommand(String stmt) {
    // normalize
    stmt = stmt.trim();
    // remove ';' at the end
    if (stmt.endsWith(";")) {
        stmt = stmt.substring(0, stmt.length() - 1).trim();
    }
    // meet bad case, e.g ";\n"
    if (stmt.trim().isEmpty()) {
        return Optional.empty();
    }
    
    Operation operation = executor.parseStatement(sessionId, stmt);
    return Optional.of(operation);
}

-f 参数调用的是 executeInNonInteractiveMode 方法,实际也会调用 executeFile 方法,跟 -i 的执行逻辑是一样的.这里就不再分析了.

另外当前的 SQL Client 仅支持嵌入式模式(也就是 embedded 模式)。将来,社区计划通过提供基于 REST 的SQL 客户端网关来扩展其功能,有关更多信息,请参见 FLIP-24 和 FLIP-91。文章来源地址https://www.toymoban.com/news/detail-426518.html

到了这里,关于FlinkSQL-- sql-client及源码解析 -- flink-1.13.6的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink:FlinkSql解析嵌套Json

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下 1,数据是网上摘抄,但包含里常用的大部分格式 {     \\\"afterColumns\\\": {         \\\"created\\\": \\\"1589186680\\\",         \\\"extra\\\": {             \\\"

    2023年04月09日
    浏览(22)
  • 【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

    本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪

    2024年02月04日
    浏览(28)
  • 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(31)
  • 【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月21日
    浏览(45)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(26)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(30)
  • Flink 学习十 FlinkSQL

    flink sql 基于flink core ,使用sql 语义方便快捷的进行结构化数据处理的上层库; 类似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整体架构和工作流程 数据流,绑定元数据 schema ,注册成catalog 中的表 table / view 用户使用table Api / table sql 来表达计算逻辑 table-planner利用 apache calci

    2024年02月10日
    浏览(33)
  • Flink 优化(六) --------- FlinkSQL 调优

    FlinkSQL 官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景: ➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不

    2024年02月14日
    浏览(27)
  • 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月04日
    浏览(29)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包