(二开)Flink 修改源码拓展 SQL 语法

这篇具有很好参考价值的文章主要介绍了(二开)Flink 修改源码拓展 SQL 语法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

核心方法

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("SHOW CATALOGS");
    }
b)类血缘

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{
    <SHOW> <CATALOGS>
    {
        return new SqlShowCatalogs(getPos());
    }
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

b)config.fmpp 内容
data: {
	# 解析器文件路径
  parser: tdd(../data/Parser.tdd)
}

# 扩展文件的目录
freemarkerLinks: {
  includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: [
    "CATALOGS"
  ]
# 新增的语法解析方法
statementParserMethods: [
    "SqlShowCatalogs()"
  ]
# 包含的扩展语法文件
implementationFiles: [
    "parserImpls.ftl"
  ]
4)编译模板文件和语法文件

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {
    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);

    public SqlXShowCatalogs(SqlParserPos pos) {
        super(pos);
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
        return Collections.emptyList();
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("XSHOW CATALOGS");
    }
}

2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{
    <XSHOW> <CATALOGS>
    {
       return new SqlXShowCatalogs(getPos());
    }
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:

"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"

keywords:

"XSHOW"

statementParserMethods:

"SqlXShowCatalogs()"
4)重新编译
 mvn generate-resources
5)执行测试用例

可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());
				
				// 拓展自定义语法 xshow catalogs 前
        // SQL parse failed. Non-query expression encountered in illegal context
        tEnv.executeSql("xshow catalogs").print();

        // 拓展自定义语法 xshow catalogs 后
        // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
    }
}

6)查看生成的扩展解析器类

可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

3、validate 概述

在向 Flink 中添加完自定义的解析规则后,报错信息如下:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate

作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert

作用:将校验过的 SqlNode 转换为 Operator。

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql

else if (validated instanceof SqlXShowCatalogs) {
            return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
     return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;

public class XShowCatalogsOperation implements ShowOperation {
    @Override
    public String asSummaryString() {
        return "SHOW CATALOGS";
    }
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());

				// FlinkSQL原本支持的语法
        tEnv.executeSql("show catalogs").print();
        
        // 自定义语法
        tEnv.executeSql("xshow catalogs").print();
    }
}

(二开)Flink 修改源码拓展 SQL 语法,Flink精通~源码设计解析,flink,sql文章来源地址https://www.toymoban.com/news/detail-728174.html

5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验

final SqlNode validated = flinkPlanner.validate(sqlNode);

2、预校验重写 Insert 语句

3、调用 SqlNode.validate() 进行校验

	1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
	2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
	3)如果是:SqlRichExplain
	4)其它:validator.validate(sqlNode)
		
			1.校验作用域和表达式:validateScopedExpression(topNode, scope)
					a)将 SqlNode 进行规范化重写
          b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
          c)校验 validateQuery 
          	i)validateFeature
          	ii)validateNamespace
          	iii)validateModality
          	iv)validateAccess
          	v)validateSnapshot
          d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
       
       2.获取校验之后的节点类型

2、将 SQLNode 转换为 Operation

converter.convertSqlQuery(validated)

	1)生成逻辑执行计划 RelNode
	RelRoot relational = planner.rel(validated);
		
		1.对查询进行转换
		sqlToRelConverter.convertQuery(validatedSqlNode)
		
	2)创建 PlannerQueryOperation
	new PlannerQueryOperation(relational.project());
	
3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

	1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodes
	val optimizedRelNodes = optimize(relNodes)
	
	2)将 optimizedRelNodes 转换为 execGraph
	val execGraph = translateToExecNodeGraph(optimizedRelNodes)
	
	3)将 execGraph 转换为 transformations
	
		1.使用代码生成技术生成Function,后续可以反射调用
		val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)

到了这里,关于(二开)Flink 修改源码拓展 SQL 语法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【大数据】Flink SQL 语法篇(一):CREATE

    CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。 目前 Flink SQL 支持下列 CREATE 语句: CREATE TABLE CREATE DATABASE CREATE VIEW CREATE FUNCTION 下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同

    2024年02月21日
    浏览(37)
  • 【大数据】Flink SQL 语法篇(六):Temporal Join

    《 Flink SQL 语法篇 》系列,共包含以下 10 篇文章: Flink SQL 语法篇(一):CREATE Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 语法篇(四):Group 聚合、Over 聚合 Flink SQL 语法篇(五):Regular Join、

    2024年03月15日
    浏览(62)
  • 全网最详细的TVBOX带会员版二开图文教程:一、tvbox如意前端后台搭建教程;二、tvbox后台配置教程;三、tvbox源码Android Studio配置修改教程;四、tvbox源码as打包教程

    一、TVBOX管理后台源码网站搭建; 搭建测试环境:PHP7.0、Nginx、按照好宝塔、配置解析好域名 1、请将下载好的tvbox源码压缩包进行解压,解压后得到的问题件如图所示 2、请将压缩包内的如图所指文件(1)上传到你的网站跟目录(记得是网站跟目录)并解压  3、请把以下文件

    2024年02月12日
    浏览(52)
  • Iceberg从入门到精通系列之七:Flink SQL创建Catalog

    type:必须是iceberg catalog-type:内置了hive和hadoop两种catalog,也可以使用catalog-impl来自定义catalog。 catalog-impl:自定义catalog实现的全限定类名。如果未设置catalog-type,则必须设置。 property-version:描述属性版本的版本号。此属性可用于向后兼容,以防属性格式更改。当前属性版本

    2024年02月11日
    浏览(58)
  • Iceberg从入门到精通系列之八:flink sql 创建Iceberg表

    建表命令支持最常用的flink建表语法,包括: PARTITION BY(column1,column2,…):配置分区,apache flink不支持隐藏分区。 COMMENT ‘table document’:指定表的备注 WITH(‘key’=‘value’,…):设置表属性

    2024年02月11日
    浏览(66)
  • 【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

    《 Flink SQL 语法篇 》系列,共包含以下 10 篇文章: Flink SQL 语法篇(一):CREATE Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 语法篇(四):Group 聚合、Over 聚合 Flink SQL 语法篇(五):Regular Join、

    2024年04月25日
    浏览(39)
  • 【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

    本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪

    2024年02月04日
    浏览(42)
  • Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据,Batch模式和Streaming模式查询数据

    仅支持Flink的Batch模式 当将数据写入v2表格时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。 建表时指定 UPSERT模式下,如果对表进行分区,则分区字段必须是主键。 Batch模式: Streaming模式: 从当前快照读取所有记录,然后从该快照读取增量数据 读取指定快照id(不包

    2024年02月12日
    浏览(50)
  • Hudi Flink SQL源码调试学习(1)

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun 本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对

    2024年02月12日
    浏览(35)
  • Hudi Flink SQL源码调试学习(一)

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun 本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对

    2024年02月11日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包