FlinkSQL学习笔记(一)快速入门

这篇具有很好参考价值的文章主要介绍了FlinkSQL学习笔记(一)快速入门。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

写在前面

本篇作为FlinkSQL的起始篇,主要介绍了FlinkSQL在使用的概述,通过本篇,可以快速上手。需要注意的一点是:FlinkSQL中的表是动态表,这是其特性之一。

1、FlinkSQL概述

FlinkSQL是架构于 flink core 之上用 sql 语义方便快捷地进行结构化数据处理的上层库;(非常类似 sparksql 和 sparkcore 的关系)

1.1、核心工作原理

整体上讲,FlinkSQL的核心工作原理如下:

  1. 将数据流(数据集),绑定元数据(schema)后,注册成catalog中的表(table、view)
  2. 然后由用户通过table API或者table SQL来表达计算逻辑;
  3. 由table-planer利用apache Calcite进行SQL语法解析,绑定元数据得到逻辑执行计划
  4. 再经过optimizer进行优化后,得到物理执行计划
  5. 物理执行计划经过代码生成器生成代码,得到transformation tree
  6. transformation tree转成JobGraph后提交到Flink集群执行
    flink sql入门,学习,笔记

下面给出一个样例对上述过程进行说明:

逻辑执行计划:通过Calcite,将SQL转化为逻辑执行计划。从逻辑执行计划中,可以看出,SQL的语句的执行顺序为from-->join-->where-->select
flink sql入门,学习,笔记
查询优化:FlinkSQL中存在两个优化器,RBO(基于规则的优化器)和CBO(基于成本的优化器)。

  • RBO(基于规则的优化器):遍历一系列规则(RelOptRule),只要满足条件就对原来的计划节点(表达式)进行转换或调整位置,生成最终的执行计划。常见的规则包括:
    • 分区裁剪(Partition Prune)、列裁剪
    • 谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit 下推、sort 下推
    • 常量折叠(Constant Folding):比如将1+2折叠为3
    • 子查询内联转 join 等。

flink sql入门,学习,笔记

  • CBO(基于代价的优化器):会保留原有表达式,基于统计信息和代价模型,尝试探索生成等价关系表达式,最终取代价最小的执行计划。CBO 的实现有两种模型:Volcano 模型和Cascades 模型。这两种模型思想很是相似,不同点在于 Cascades 模型一边遍历 SQL 逻辑树,一边优化,从而进一步裁剪掉一些执行计划。

    根据代价 cost 选择批处理 join 有方式(sortmergejoin,hashjoin,boradcasthashjoin)。
    比如前文中的例子,再 filter 下推之后,在 t2.id<1000 的情况下,由 1 百万数据量变为了 1 千条,计算 cost 之后,使用 broadcasthashjoin 最合适。

flink sql入门,学习,笔记
算子树(transformation tree):将物理执行计划中的节点转化为对应的算子,即Flink中的一种Function,通过相应的条件动态生成代码。
flink sql入门,学习,笔记

1.2、动态表特性

与 spark、hive 等组件中的“表”的最大不同之处:FlinkSQL中的表是动态表
Flink的核心决定了上述特性:

  • flink 对数据的核心抽象是“无界(或有界)的数据流”
  • 对数据处理过程的核心抽象是“流式持续处理”

因而,flinksql 对“源表(动态表)”的计算及输出结果(结果表),也是流式、动态、持续的;

  • 数据源的数据是持续输入
  • 查询过程是持续计算
  • 查询结果是持续输出

以下图为例:

  • “源表 clicks”是流式动态的;
  • “聚合查询的输出结果表”,也是流式动态的

这其中的动态,不仅体现在“数据追加”,对于输出结果表来说,“动态”还包含对“前序输出结果”的“撤
回(删除)”、“更新”等模式。它的核心设计是在底层的数据流中为每条数据添加**“ChangeMode(修正模式)标记”**,而添加了这种ChangeMode 标记的底层数据流,取名为 changelogStream
flink sql入门,学习,笔记

2、FlinkSQL编程概述

由于FlinkSQL建立在Flink core的基础之上,这里进行先对一个简单的FlinkSQL编程过程进行说明。
FlinkSQL编程包括TableAPI和SQLAPI,运用中更多地使用SQLAPI,这里对于TableAPI不做详细介绍,后续用到的时候再进行详细介绍。此外,在编程方式上,两种SQL可以进行混合使用。

2.1、FlinkSQL程序结构

导入依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

FlinkSQL编程步骤

  1. 创建 flinksql 编程入口
EnvironmentSettings envSettings = EnvironmentSettings
									.fromConfiguration(new Configuration());
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
  1. 将数据源定义(映射)成表(视图)
/*
* 把kafka中的一个topic,映射成一张FlinkSQL表
* kafka:{"id":1,"name":"zs","age":20,"gender":"male"}
* */
tableEnv.executeSql(
                 " CREATE TABLE KafkaTable (										"
                +	" 	id		int,                                                "
                +	" 	name	string,                                             "
                +	" 	age		int,                                                "
                +	" 	gender	string                                              "
                +	" ) WITH (                                                      "
                +	"   'connector' = 'kafka',                                      "
                +	"   'topic' = 'table_test',                                     "
                +	"   'properties.bootstrap.servers' = '192.168.247.129:9092',    "
                +	"   'properties.group.id' = 'testGroup',                        "
                +	"   'scan.startup.mode' = 'earliest-offset',                    "
                +	"   'format' = 'json'                                           "
                +	" )																"
);
  1. 执行 sql 语义的查询(sql 语法或者 tableapi)
 tableEnv.executeSql("select gender,avg(age) as avg_age from KafkaTable group by gender").print();
  1. 将查询结果输出到目标表
    注:这里的输出通过print进行输出,已经合并到步骤3中
    输入数据:
{"id":1,"name":"zs","age":20,"gender":"male"}
{"id":2,"name":"ls","age":30,"gender":"female"}
{"id":3,"name":"ww","age":40,"gender":"female"}
{"id":4,"name":"zl","age":50,"gender":"male"}

输出结果:
flink sql入门,学习,笔记文章来源地址https://www.toymoban.com/news/detail-828577.html

2.2、FlinkSQL,TableAPI方式

  1. 建表
 Table table = tableEnv.from(TableDescriptor
         .forConnector("kafka")
         .schema(Schema.newBuilder()
                 .column("id", DataTypes.INT())
                 .column("name", DataTypes.STRING())
                 .column("age", DataTypes.INT())
                 .column("gender", DataTypes.STRING())
                 .build())
         .format("json")
         .option("topic", "testTopic")
         .option("properties.bootstrap.servers", "192.168.247.129:9092")
         .option("properties.group.id", "testGroup")
         .option("scan.startup.mode", "earliest-offset")
         .option("json.fail-on-missing-field", "false")
         .option("json.ignore-parse-errors", "true")
         .build());
  1. TableAPI
Table tableApi = table.groupBy($("gender"))
        .select($("gender"), $("age").avg().as("avg_age", "avg_age_2"));
  1. 执行
tableApi.execute().print();

到了这里,关于FlinkSQL学习笔记(一)快速入门的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Golang快速入门到实践学习笔记

    Go程序设计的一些规则 Go之所以会那么简洁,是因为它有一些默认的行为: 大写字母开头的变量是可导出的,也就是其它包可以读取 的,是公用变量;小写字母开头的就是不可导出的,是私有变量。 大写字母开头的函数也是一样,相当于class 中的带public的公有函数;

    2024年02月20日
    浏览(55)
  • SpringCloud学习笔记(一)_快速入门

    Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线)。分布式系统的协调导致了样板模式, 使用Spring Cloud开发人员可以快速地支持实现这些模式的服务和应用程序。他们将在任何分布式环

    2024年02月11日
    浏览(50)
  • OpenCV4 快速入门 (学习笔记 全)

    作者博客https://blog.csdn.net/shuiyixin?type=blog https://blog.csdn.net/shuiyixin/article/details/106046827 1.1.1 Mat类 https://blog.csdn.net/shuiyixin/article/details/106014341 1.1.2 Rect_结构 https://blog.csdn.net/shuiyixin/article/details/106085233 1.1.3 Scalar_结构 Scalar其实是一个从Vec派生得到的四元向量的模板类 一般情况,我

    2024年02月07日
    浏览(49)
  • 【ECharts+Vue】学习笔记(快速入门版)

    ECharts 是一个使用 JavaScript 实现的开源可视化库,涵盖各行业图表,满足各种需求。提供了丰富的可视化图标,帮助你轻松实现大屏展示。 官网地址:Apache ECharts 直接下载 下载官网: https://echarts.apache.org/zh/download.html 如果不挑版本的话,菜鸟教程提供了4.7.0版本地址ECharts 安

    2023年04月21日
    浏览(30)
  • PyTorch深度学习快速入门教程【小土堆】 学习笔记

    PyTorch深度学习快速入门教程(绝对通俗易懂!)【小土堆】 anaconda 卸载环境 :conda uninstall -n yyy --all anaconda 安装路径:D:anaconda3 创建环境: conda create -n pytorch python=3.9 切换环境 : conda activate pytorch 查看目前已经安装的工具包:pip list Q 安装pytorch? 进入pytorch首页 下拉,http

    2024年02月07日
    浏览(56)
  • 【Redis学习笔记01】快速入门(含安装教程)

    先来看门见山的给出 Redis 的概念: Redis:是一种基于内存的高性能K-V键值型NoSQL数据库 Redis官网:https://redis.io/ 1.1 初识NoSQL 想必大家都对关系型数据库更为熟悉!如MySQL、Oracle、SQL Server都是比较常见的关系型数据库,所谓关系型数据库主要以二维表作为数据结构进行存储,但

    2024年01月22日
    浏览(47)
  • 黑马程序员Docker快速入门到项目部署(学习笔记)

    目录 一、Docker简介 二、安装Docker 2.1、卸载旧版 2.2、配置Docker的yum库 2.3、安装Docker 2.4、启动和校验 2.5、配置镜像加速 2.5.1、注册阿里云账号 2.5.2、开通镜像服务 2.5.3、配置镜像加速 三、快速入门 3.1、部署MYSQL 3.2、命令解读 四、Docker基础 4.1、常见命令 4.1.1、命令介绍 4.1

    2024年01月25日
    浏览(51)
  • Go语言学习笔记:GORM 介绍及快速入门,简单查询

    GORM 是一个用 GoLang 语言编写的 ORM(对象关系映射)库。它被设计为开发者友好的方式来进行数据库操作。GORM 提供了一种高级的 API 来处理数据库的 CRUD(创建、读取、更新、删除)操作,它支持主流的关系型数据库,如 MySQL、PostgreSQL、SQLite 和 Microsoft SQL Server。 GORM 指南 使

    2024年01月21日
    浏览(76)
  • Flink 优化(六) --------- FlinkSQL 调优

    FlinkSQL 官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景: ➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不

    2024年02月14日
    浏览(42)
  • 读SQL学习指南(第3版)笔记04_查询入门

    3.2.2.1. 子查询由一对小括号包围,可以出现在select语句的各个部分中 3.2.2.2. 子查询的作用在于生成其他所有查询子句中可见的派生数据表,以及与from子句中的其他数据表交互 3.2.3.1. mysql 3.2.3.2. 临时保留在内存中,会话结束后就消失了 3.2.3.3. Oracle Database是一个例外,它会

    2024年02月11日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包