【Flink系列七】TableAPI和FlinkSQL初体验

这篇具有很好参考价值的文章主要介绍了【Flink系列七】TableAPI和FlinkSQL初体验。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL

  • Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
  •  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。

【Flink系列七】TableAPI和FlinkSQL初体验,从零开始搞大数据,flink,大数据

基本程序结构

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;

// 可以从流中创建表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件
DataStream<String> inputStream = env.readTextFile("xxx文件地址")

//包装转换
DataStream<SensorReading> dataStream = xx转换;

//创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//tableEnv
Table dataTable = tableEnv.fromDataSource(dataStream);

Table resultTable = dataTable.select("id, temp")
.where("id=1");

// 输出数据
tableEnv.toAppendStream(resultTable, Sensor.class);



// register the Table dataTable as table "dataTable"
tableEnv.createTemporaryView("dataTable", dataTable);

// 执行sql
Table resultTable1 = tableEnv.sqlQuery("select id,temp from dataTable where is = 1");


// 输出数据
tableEnv.toAppendStream(resultTable1, Sensor.class);

TableEnvironment

TableAPI的核心概念是TableEnvironment,它主要负责

  • 在内部的 catalog 中注册 Table
  • 注册外部的 catalog
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册自定义函数 (scalar、table 或 aggregation)
  • DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )

Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    //.inBatchMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

或者也可以按程序示例中的方式,从现有的 StreamExecutionEnvironment 创建一个 StreamTableEnvironment 与 DataStream API 互操作。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

在 Catalog 中创建表

---先解释一下什么是Catalog:数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。cataLog详细解释

TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

创建表

在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:

// register the Table dataTable as table "dataTable"
tableEnv.createTemporaryView("dataTable", dataTable);

// 执行sql
Table resultTable = tableEnv.sqlQuery("select id,temp from dataTable where is = 1");


// 输出数据
tableEnv.toAppendStream(resultTable, Sensor.class);

注意: 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享。

Connector Tables

另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。

// Using table descriptors
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
    .schema(Schema.newBuilder()
    .column("f0", DataTypes.STRING())
    .build())
    .option(DataGenOptions.ROWS_PER_SECOND, 100)
    .build();

tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);

// Using SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

扩展表标识符 

表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。

用户可以指定一个 catalog 和数据库作为 “当前catalog” 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。

标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。

// 默认
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");


// 注册一个view 命名为 'exampleView' 在 catalog 中命名为 'custom_catalog'
// 默认在 'custom_database'的数据库中 
tableEnv.createTemporaryView("exampleView", table);

//注册一个view 命名为 'exampleView' 在 catalog 中命名为 'custom_catalog'
// 指定数据库的名称为 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);

// 注册一个view 命名为  'example.View', 在 catalog 中命名为'custom_catalog'
// 默认在 'custom_database'的数据库中 
tableEnv.createTemporaryView("`example.View`", table);

// 注册view名为 'exampleView', 指定 catalog命名为'other_catalog'
// 指定数据库的名称为  'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

查询表

tableApi

Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成,例如 table.groupBy(...).select(),其中 groupBy(...) 指定 table 的分组,而 select(...) 在 table 分组上的投影。

所有流处理和批处理表支持的 Table API 算子--> 文档 Table API 

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// 注册表

// 获取order表
Table orders = tableEnv.from("Orders");
// 从所有的数据中过滤 cCountry=France,然后聚合
Table revenue = orders
  .filter($("cCountry").isEqual("FRANCE"))
  .groupBy($("cID"), $("cName"))
  .select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

// emit or convert Table
// execute query
SQL

Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。

Flink对流处理和批处理表的SQL支持-->文档 SQL 

一个示例:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// 注册 Orders table

// 执行sql处理
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 展示了如何指定一个更新查询,将查询的结果插入到已注册的表中。
// 执行sql获取到的数据 and 提交到 "RevenueFrance"表中
tableEnv.executeSql(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

输出表

Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSinkRetractStreamTableSink 或者 UpsertStreamTableSink

演示如何输出 Table

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// create an output Table
final Schema schema = Schema.newBuilder()
    .column("a", DataTypes.INT())
    .column("b", DataTypes.STRING())
    .column("c", DataTypes.BIGINT())
    .build();

tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/path/to/file")
    .format(FormatDescriptor.forFormat("csv")
        .option("field-delimiter", "|")
        .build())
    .build());

// compute a result Table using Table API operators and/or SQL queries
Table result = ...

//方法 Table.executeInsert(String tableName) 将 Table 发送至已注册的 TableSink。该方法通过名称在 catalog 中查找 TableSink 并确认Table schema 和 TableSink schema 一致。
result.executeInsert("CsvSinkTable");

翻译与执行查询 

不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。 查询在内部表示为逻辑查询计划,并被翻译成两个阶段:

  1. 优化逻辑执行计划
  2. 翻译成 DataStream 程序

Table API 或者 SQL 查询在下列情况下会被翻译:

  1. 当 T ableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
  2. 当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
  3. 当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
  4. 当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
  5. 当 Table 被转换成 DataStream 时(参阅与 DataStream 集成)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEn vironment.execute() 时被执行。

查询优化

Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如:

  1.  基于 Apache Calcite 的子查询解相关
  2. 投影剪裁
  3. 分区剪裁
  4. 过滤器下推
  5. 子计划消除重复数据以避免重复计算
  6. 特殊子查询重写,包括两部分:
    1. 将 IN 和 EXISTS 转换为 left semi-joins
    2. 将 NOT IN 和 NOT EXISTS 转换为 left anti-join
  7. 可选 join 重新排序
    1. 通过 table. optimizer.join-reorder-enabled 启用

注意: 当前仅在子查询重写的结合条件下支持 IN / EXISTS / NOT IN / NOT EXISTS。

优化器不仅基于计划,而且还基于可从数据源获得的丰富统计信息以及每个算子(例如 io,cpu,网络和内存)的细粒度成本来做出明智的决策。

解释表

Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:

  1. 关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划,
  2. 优化的逻辑查询计划,以及
  3. 物理执行计划。

可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划,请参阅 EXPLAIN 页面.

以下代码展示了一个示例以及对给定 Table 使用 Table.explain() 方法的相应输出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
  .where($("word").like("F%"))
  .unionAll(table2);

System.out.println(table.explain());

上述例子的结果是:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:  +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])

== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

参考资料:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/#expanding-table-identifiers文章来源地址https://www.toymoban.com/news/detail-768228.html

到了这里,关于【Flink系列七】TableAPI和FlinkSQL初体验的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink TableAPI Window and Watermarket

    本次主要是弄清楚.批流统一 的处理方式,因为它是使用SQL来操作批流计算的.所以它怎么设置算子并行度?如何设置窗口?如何处理流式数据?等等 有很多疑问. 我还是觉得直接使用流计算的API更好.流批一体API最终也是转换成流式计算,最主要的是使用sql来设置算子或者窗口,并不直

    2024年02月12日
    浏览(33)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(39)
  • 从零开始学Spring Boot系列-前言

    在数字化和信息化的时代,Java作为一种成熟、稳定且广泛应用的编程语言,已经成为构建企业级应用的首选。而在Java生态系统中,Spring框架无疑是其中最为耀眼的一颗明星。它提供了全面的编程和配置模型,用于构建企业级应用。随着Spring Boot的出现,这一框架变得更加易于

    2024年02月22日
    浏览(51)
  • 从零开始学Spring Boot系列-SpringApplication

    SpringApplication类提供了一种从main()方法启动Spring应用的便捷方式。在很多情况下, 你只需委托给 SpringApplication.run这个静态方法 : 当应用启动时, 你应该会看到类似下面的东西: 默认情况下会显示INFO级别的日志信息, 包括一些相关的启动详情, 比如启动应用的用户等。 通过

    2024年04月08日
    浏览(57)
  • 从零开始学Spring Boot系列-集成Kafka

    Apache Kafka是一个开源的分布式流处理平台,由LinkedIn公司开发和维护,后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统,可以处理消费者网站的所有动作流数据。这种动作流数据包括页面浏览、搜

    2024年03月21日
    浏览(57)
  • 从零开始学Spring Boot系列-集成mybatis

    在Spring Boot的应用开发中,MyBatis是一个非常流行的持久层框架,它支持定制化SQL、存储过程以及高级映射。在本篇文章中,我们将学习如何在Spring Boot项目中集成MyBatis,以便通过MyBatis进行数据库操作。 首先,我们需要在项目中添加MyBatis的依赖。在Spring Boot中,我们通常会使

    2024年03月10日
    浏览(123)
  • 从零开始学习K8s系列——Kubernetes指南

    作者:禅与计算机程序设计艺术 Kubernetes(简称k8s)是一个开源的,用于自动部署、扩展和管理容器化的应用的平台。它主要提供四大功能,包括: 服务发现和负载均衡 :Kubernetes集群中的服务能够自动地寻找其他运行着的服务并进行负载均衡。 存储编排 :Kubernetes允许用户

    2024年02月06日
    浏览(47)
  • 从零开始学Spring Boot系列-外部化配置

    Spring Boot 允许你将配置外部化,以便可以在不同的环境中使用相同的应用程序代码。可以使用属性文件、YAML文件、环境变量和命令行参数将配置外部化。属性值可以通过使用 @Value 注解直接注入 bean,可以通过 Spring 的 Environment 抽象访问,也可以通过 @ConfigurationProperties。 Sp

    2024年04月10日
    浏览(100)
  • 【Mysql系列】从零开始学MySQL:Docker部署快速上手

    💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老 导航 檀越剑指大厂系列:全面总

    2024年02月06日
    浏览(35)
  • 从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能

    * 设备ID */ private Integer deviceId; * 监控的变量名称 */ private String varName; * 最小值 */ private Double min; * 最大值 */ private Double max; } /** * 报警消息 */ @Data public class AlarmMessage { * 设备 */ private Integer deviceId; * 报警时间 */ private Long timestamp; /** * 触发报警的采集变量名称 */ private String ala

    2024年04月11日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包