写在前面
本篇作为FlinkSQL的起始篇,主要介绍了FlinkSQL在使用的概述,通过本篇,可以快速上手。需要注意的一点是:FlinkSQL中的表是动态表,这是其特性之一。
1、FlinkSQL概述
FlinkSQL是架构于 flink core 之上用 sql 语义方便快捷地进行结构化数据处理的上层库;(非常类似 sparksql 和 sparkcore 的关系)
1.1、核心工作原理
整体上讲,FlinkSQL的核心工作原理如下:
- 将数据流(数据集),绑定元数据(schema)后,注册成catalog中的表(table、view)
- 然后由用户通过table API或者table SQL来表达计算逻辑;
- 由table-planer利用apache Calcite进行SQL语法解析,绑定元数据得到逻辑执行计划
- 再经过optimizer进行优化后,得到物理执行计划
- 物理执行计划经过代码生成器生成代码,得到transformation tree
- transformation tree转成JobGraph后提交到Flink集群执行
下面给出一个样例对上述过程进行说明:
逻辑执行计划:通过Calcite,将SQL转化为逻辑执行计划。从逻辑执行计划中,可以看出,SQL的语句的执行顺序为from-->join-->where-->select
。
查询优化:FlinkSQL中存在两个优化器,RBO(基于规则的优化器)和CBO(基于成本的优化器)。
- RBO(基于规则的优化器):遍历一系列规则(RelOptRule),只要满足条件就对原来的计划节点(表达式)进行转换或调整位置,生成最终的执行计划。常见的规则包括:
- 分区裁剪(Partition Prune)、列裁剪
- 谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit 下推、sort 下推
- 常量折叠(Constant Folding):比如将1+2折叠为3
- 子查询内联转 join 等。
- CBO(基于代价的优化器):会保留原有表达式,基于统计信息和代价模型,尝试探索生成等价关系表达式,最终取代价最小的执行计划。CBO 的实现有两种模型:Volcano 模型和Cascades 模型。这两种模型思想很是相似,不同点在于 Cascades 模型一边遍历 SQL 逻辑树,一边优化,从而进一步裁剪掉一些执行计划。
根据代价 cost 选择批处理 join 有方式(sortmergejoin,hashjoin,boradcasthashjoin)。
比如前文中的例子,再 filter 下推之后,在 t2.id<1000 的情况下,由 1 百万数据量变为了 1 千条,计算 cost 之后,使用 broadcasthashjoin 最合适。
算子树(transformation tree):将物理执行计划中的节点转化为对应的算子,即Flink中的一种Function,通过相应的条件动态生成代码。
1.2、动态表特性
与 spark、hive 等组件中的“表”的最大不同之处:FlinkSQL中的表是动态表!
Flink的核心决定了上述特性:
- flink 对数据的核心抽象是“无界(或有界)的数据流”
- 对数据处理过程的核心抽象是“流式持续处理”
因而,flinksql 对“源表(动态表)”的计算及输出结果(结果表),也是流式、动态、持续的;
- 数据源的数据是持续输入
- 查询过程是持续计算
- 查询结果是持续输出
以下图为例:
- “源表 clicks”是流式动态的;
- “聚合查询的输出结果表”,也是流式动态的
这其中的动态
,不仅体现在“数据追加”,对于输出结果表来说,“动态”还包含对“前序输出结果”的“撤
回(删除)”、“更新”等模式。它的核心设计是在底层的数据流中为每条数据添加**“ChangeMode(修正模式)标记”**,而添加了这种ChangeMode 标记的底层数据流,取名为 changelogStream
2、FlinkSQL编程概述
由于FlinkSQL建立在Flink core的基础之上,这里进行先对一个简单的FlinkSQL编程过程进行说明。
FlinkSQL编程包括TableAPI和SQLAPI,运用中更多地使用SQLAPI,这里对于TableAPI不做详细介绍,后续用到的时候再进行详细介绍。此外,在编程方式上,两种SQL可以进行混合使用。
2.1、FlinkSQL程序结构
导入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
FlinkSQL编程步骤:文章来源:https://www.toymoban.com/news/detail-828577.html
- 创建 flinksql 编程入口
EnvironmentSettings envSettings = EnvironmentSettings
.fromConfiguration(new Configuration());
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
- 将数据源定义(映射)成表(视图)
/*
* 把kafka中的一个topic,映射成一张FlinkSQL表
* kafka:{"id":1,"name":"zs","age":20,"gender":"male"}
* */
tableEnv.executeSql(
" CREATE TABLE KafkaTable ( "
+ " id int, "
+ " name string, "
+ " age int, "
+ " gender string "
+ " ) WITH ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'table_test', "
+ " 'properties.bootstrap.servers' = '192.168.247.129:9092', "
+ " 'properties.group.id' = 'testGroup', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'format' = 'json' "
+ " ) "
);
- 执行 sql 语义的查询(sql 语法或者 tableapi)
tableEnv.executeSql("select gender,avg(age) as avg_age from KafkaTable group by gender").print();
- 将查询结果输出到目标表
注:这里的输出通过print进行输出,已经合并到步骤3中
输入数据:
{"id":1,"name":"zs","age":20,"gender":"male"}
{"id":2,"name":"ls","age":30,"gender":"female"}
{"id":3,"name":"ww","age":40,"gender":"female"}
{"id":4,"name":"zl","age":50,"gender":"male"}
输出结果:
文章来源地址https://www.toymoban.com/news/detail-828577.html
2.2、FlinkSQL,TableAPI方式
- 建表
Table table = tableEnv.from(TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build())
.format("json")
.option("topic", "testTopic")
.option("properties.bootstrap.servers", "192.168.247.129:9092")
.option("properties.group.id", "testGroup")
.option("scan.startup.mode", "earliest-offset")
.option("json.fail-on-missing-field", "false")
.option("json.ignore-parse-errors", "true")
.build());
- TableAPI
Table tableApi = table.groupBy($("gender"))
.select($("gender"), $("age").avg().as("avg_age", "avg_age_2"));
- 执行
tableApi.execute().print();
到了这里,关于FlinkSQL学习笔记(一)快速入门的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!