【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

这篇具有很好参考价值的文章主要介绍了【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink SQL 基础概念》系列,共包含以下 5 篇文章:

  • Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
  • Flink SQL 基础概念(二):数据类型
  • Flink SQL 基础概念(三):SQL 动态表 & 连续查询
  • Flink SQL 基础概念(四):SQL 的时间属性
  • Flink SQL 基础概念(五):SQL 时区问题

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

1.SQL & Table 简介及运行环境

1.1 简介

Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。

Table API 是一种集成在 Java、Scala 和 Python 语言中的查询 API,简单理解就是用 Java、Scala、Python 按照 SQL 的查询接口封装了一层 lambda 表达式的查询 API,它允许以强类型接口的方式组合各种关系运算符(如选择、筛选和联接)的查询操作,然后生成一个 Flink 任务运行。如下案例所示:

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.*;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 下面就是 Table API 的案例,其语义等同于
// select a, count(b) as cnt 
// from Orders
// group by a
DataSet<Row> result = tEnv
        .from("Orders")
        .groupBy($("a"))
        .select($("a"), $("b").count().as("cnt"))
        .toDataSet(counts, Row.class);

result.print();

SQL API 是基于 SQL 标准的 Apache Calcite 框架实现的,我们可以使用纯 SQL 来开发和运行一个 Flink 任务。如下案例所示:

insert into target
select a, count(b) as cnt
from Orders
group by a

注意:无论输入是连续(流处理)还是有界(批处理),在 Table 和 SQL 任一 API 中同一条查询语句是具有相同的语义并且会产出相同的结果的。这就是说为什么 Flink SQL 和 Table API 可以做到在用户接口层面的流批统一。xdm,用一套 SQL 既能跑流任务,也能跑批任务,它不香嘛?

Table API 和 SQL API 也与 DataStream API 做到了无缝集成。可以轻松地在三种 API 之间灵活切换。例如,可以使用 SQL 的 MATCH_RECOGNIZE 子句匹配出异常的数据,然后使用再转为 DataStream API 去灵活的构建针对于异常数据的自定义报警机制。

在大致了解了这两个 API 是干啥的之后,我们就可以直接来看看,怎么使用这两个 API 了。

1.2 SQL 和 Table API 运行环境依赖

根据小伙伴们使用的编程语言的不同(Java 或 Scala),需要将对应的依赖包添加到项目中。

Java 依赖如下:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.13.5</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.13.5</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.13.5</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.13.5</version>
</dependency>

Scala 依赖如下:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.13.5</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.13.5</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.13.5</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.13.5</version>
</dependency>

引入上述依赖之后,小伙伴萌就可以开始使用 Table / SQL API 了。具体案例如下文所示。

2.SQL & Table 的基本概念及常用 API

在小伙伴萌看下文之前,先看一下整体的思路,跟着博主思路走,会更清晰:

  • 先通过一个 SQL / Table API 任务看一下我们在实际开发时的代码结构应该长啥样,让大家能有直观的感受。
  • 重点介绍 SQL / Table API 中核心 API - TableEnvironment。SQL / Table 所有能用的接口都在 TableEnvironment 中。
  • 通过两个角度(外部表 / 视图、临时 / 非临时)认识 Flink SQL 体系中的表的概念。
  • 举几个创建外部表、视图的实际应用案例。

2.1 一个 SQL / Table API 任务的代码结构

// 创建一个 TableEnvironment,为后续使用 SQL 或者 Table API 提供上线
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode() // 声明为流任务
    //.inBatchMode() // 声明为批任务
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 创建一个输入表
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// 创建一个输出表
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 1. 使用 Table API 做一个查询并返回 Table
Table table2 = tableEnv.from("table1").select(...);
// 2. 使用 SQl API 做一个查询并返回 Table
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");

// 将 table2 的结果使用 Table API 写入 outputTable 中,并返回结果
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...

总结一下上面案例使用到的一些 API,让大家先对 Table / SQL API 的能力有一个大概了解:

  • TableEnvironment:Table API 和 SQL API 的都集成在一个 统一上下文(即 TableEnvironment)中,其地位等同于 DataStream API 中的 StreamExecutionEnvironment 的地位
  • TableEnvironment::executeSql:用于 SQL API 中,可以执行一段完整 DDL、DML SQL。举例,方法入参可以是 CREATE TABLE xxxINSERT INTO xxx SELECT xxx FROM xxx
  • TableEnvironment::from(xxx):用于 Table API 中,可以以强类型接口的方式运行。方法入参是一个表名称。
  • TableEnvironment::sqlQuery:用于 SQL API 中,可以执行一段查询 SQL,并把结果以 Table 的形式返回。举例,方法的入参是 SELECT xxx FROM xxx
  • Table::executeInsert:用于将 Table 的结果插入到结果表中。方法入参是写入的目标表。

无论是对于 SQL API 来说还是对于 Table API 来说,都是使用 TableEnvironment 接口承载我们的业务查询逻辑的。只是在用户的使用接口的方式上有区别,以上述的 Java 代码为例,Table API 其实就是模拟 SQL 的查询方式封装了 Java 语言的 lambda 强类型 API,SQL 就是纯 SQL 查询。Table 和 SQL 很多时候都是掺杂在一起的,大家理解的时候就可以直接将 Table 和 SQL API 直接按照 SQL 进行理解,不用强行做特殊的区分。

而且博主推荐的话,直接上 SQL API 就行,其实 Table API 在企业实战中用的不是特别多。你说 Table API 方便吧,它确实比 DataStream API 方便,但是又比 SQL 复杂。一般生产使用不多。

注意:由于 Table 和 SQL API 基本上属于一回事,后续如果没有特别介绍的话,博主就直接按照 SQL API 进行介绍了。

2.2 SQL 上下文:TableEnvironment

TableEnvironment 是使用 SQL API 永远都离不开的一个接口。其是 SQL API 使用的入口(上下文),就像是你要使用 Java DataStream API 去写一个 Flink 任务需要使用到 StreamExecutionEnvironment 一样。

可以认为 TableEnvironment 在 SQL API 中的地位和 StreamExecutionEnvironment 在 DataStream 中的地位是一样的,都是包含了一个 Flink 任务运行时的所有上下文环境信息。大家这样对比学习会比较好理解。

TableEnvironment 包含的功能如下:

  • Catalog 管理Catalog 可以理解为 Flink 的 MetaStore,类似 Hive MetaStore 对在 Hive 中的地位,关于 Flink Catalog 的详细内容后续进行介绍。
  • 表管理:在 Catalog 中注册表。
  • SQL 查询:(这 TMD 还用说,最基本的功能啊),就像 DataStream 中提供了 addSourcemapflatmap 等接口。
  • UDF 管理:注册用户定义(标量函数:一进一出、表函数:一进多出、聚合函数:多进一出)函数。
  • UDF 扩展:加载可插拔 Module(Module 可以理解为 Flink 管理 UDF 的模块,是可插拔的,可以让小伙伴萌自定义 Module,去支持奇奇怪怪的 UDF 功能)。

DataStream 和 Table(Table / SQL API 的查询结果)之间进行转换:目前 1.13 1.13 1.13 版本的只有流任务支持,批任务不支持。 1.14 1.14 1.14 支持批任务。

接下来介绍如何创建一个 TableEnvironment。案例为 Java。

  • 方法 1:通过 EnvironmentSettings 创建 TableEnvironment
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

// 1. 就是设置一些环境信息
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode() // 声明为流任务
    //.inBatchMode() // 声明为批任务
    .build();

// 2. 创建 TableEnvironment
TableEnvironment tEnv = TableEnvironment.create(settings);

1.13 1.13 1.13 版本中:

  • 如果你是 inStreamingMode,则最终创建出来的 TableEnvironment 实例为 StreamTableEnvironmentImpl
  • 如果你是 inBatchMode,则最终创建出来的 TableEnvironment 实例为 TableEnvironmentImpl

它两虽然都继承了 TableEnvironment 接口,但是 StreamTableEnvironmentImpl 支持的功能更多一些。大家可以直接去看看接口实验一下,这里就不进行详细介绍。

  • 方法 2:通过已有的 StreamExecutionEnvironment 创建 TableEnvironment
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

2.3 SQL 中表的概念(外部表 TABLE、视图 VIEW)

一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称。如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default

举个例子,下面这个 SQL 创建的 Table 的全名为 default.default.table1

tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");

下面这个 SQL 创建的 Table 的全名为 default.mydatabase.table1

tableEnv.executeSql("CREATE TEMPORARY TABLE mydatabase.table1 ... WITH ( 'connector' = ... )");

可以是 常规的(外部表 TABLE),也可以是 虚拟的(视图 VIEW)。

  • 外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。
  • 视图 VIEW:从已经存在的表中创建,视图一般是一个 SQL 逻辑的查询结果。对比到离线的 Hive SQL 中,在离线的场景(Hive 表)中 VIEW 也都是从已有的表中去创建的。其实 Flink 也是一样的。

注意:这里有不同的地方就是,离线 Hive MetaStore 中不会有 Catalog 这个概念,其标识都是 数据库.数据表

2.4 SQL 临时表、永久表

  • 表(视图、外部表)可以是 临时的,并与单个 Flink Session(可以理解为 Flink 任务运行一次就是一个 Session)的生命周期绑定。
  • 表(视图、外部表)也可以是 永久的,并且对多个 Flink Session 都生效。

临时表:通常保存于内存中并且仅在创建它们的 Flink Session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 Session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。因为这个表的元数据没有被持久化。如下案例:

-- 临时外部表
CREATE TEMPORARY TABLE source_table (
    user_id BIGINT,
    `name` STRING
) WITH (
  'connector' = 'user_defined',
  'format' = 'json',
  'class.name' = 'flink.examples.sql._03.source_sink.table.user_defined.UserDefinedSource'
);

-- 临时视图
CREATE TEMPORARY VIEW query_view as
SELECT *
FROM source_table;

永久表:需要外部 Catalog(例如 Hive Metastore)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink Session 可见且持续存在,直至从 Catalog 中被明确删除。如下案例:

-- 永久外部表。需要外部 Catalog 持久化!!!
CREATE TABLE source_table (
    user_id BIGINT,
    `name` STRING
) WITH (
  'connector' = 'user_defined',
  'format' = 'json',
  'class.name' = 'flink.examples.sql._03.source_sink.table.user_defined.UserDefinedSource'
);

-- 永久视图。需要外部 Catalog 持久化!!!
CREATE VIEW query_view as
SELECT *
FROM source_table;

🚀 注意:如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink Session 中,你的任务访问到这个表时,访问到的永远是临时表(即 相同名称的表,临时表会屏蔽永久表)。

2.5 SQL 外部数据表

由于目前在实时数据的场景中多以消息队列作为数据表。此处就以 Kafka 为例创建一个外部数据表。

2.5.1 Table API 创建外部数据表

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build();

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

    // kafka 数据源
    DataStream<Row> r = env.addSource(new FlinkKafkaConsumer<Row>(xxx));
    // 将 DataStream 转为一个 Table API 中的 Table 对象进行使用
    Table sourceTable = tEnv.fromDataStream(r
            , Schema
                    .newBuilder()
                    .column("f0", "string")
                    .column("f1", "string")
                    .column("f2", "bigint")
                    .columnByExpression("proctime", "PROCTIME()")
                    .build());

    tEnv.createTemporaryView("source_table", sourceTable);

    String selectWhereSql = "select f0 from source_table where f1 = 'b'";

    Table resultTable = tEnv.sqlQuery(selectWhereSql);

    tEnv.toRetractStream(resultTable, Row.class).print();

    env.execute();
}

上述案例中,Table API 将一个 DataStream 的结果集通过 StreamTableEnvironment::fromDataStream 转为一个 Table 对象来使用。

2.5.2 SQL API 创建外部数据表

EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// SQL API 执行 create table 创建表
tEnv.executeSql(
        "CREATE TABLE KafkaSourceTable (\n"
                + "  `f0` STRING,\n"
                + "  `f1` STRING\n"
                + ") WITH (\n"
                + "  'connector' = 'kafka',\n"
                + "  'topic' = 'topic',\n"
                + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
                + "  'properties.group.id' = 'testGroup',\n"
                + "  'format' = 'json'\n"
                + ")"
);

Table t = tEnv.sqlQuery("SELECT * FROM KafkaSourceTable");

具体的创建方式就是使用 Create Table xxx DDL 定义一个 Kafka 数据源(输入)表(也可以是 Kafka 数据汇(输出)表)。

xdm,是不是又和 Hive 一样?惊不惊喜意不意外。对比学习 +1。

2.6 SQL 视图 VIEW

上文已经说了,一个 VIEW 其实就是一段 SQL 逻辑的查询结果。

视图 VIEW 在 Table API 中的体现就是:一个 Table 的 Java 对象,其封装了一段查询逻辑。如下案例所示。

2.6.1 Table API 创建 VIEW

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode() // 声明为流任务
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// Table API 中的一个 Table 对象
Table projTable = tEnv.from("X").select(...);

// 将 projTable 创建为一个叫做 projectedTable 的 VIEW
tEnv.createTemporaryView("projectedTable", projTable);

Table API 是使用了 TableEnvironment::createTemporaryView 接口将一个 Table 对象创建为一个 VIEW。

2.6.2 SQL API 创建 VIEW

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode() // 声明为流任务
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

String sql = "CREATE TABLE source_table (\n"
		    + "    user_id BIGINT,\n"
		    + "    `name` STRING\n"
		    + ") WITH (\n"
		    + "  'connector' = 'user_defined',\n"
		    + "  'format' = 'json',\n"
		    + "  'class.name' = 'flink.examples.sql._03.source_sink.table.user_defined.UserDefinedSource'\n"
		    + ");\n"
		    + "\n"
		    + "CREATE TABLE sink_table (\n"
		    + "    user_id BIGINT,\n"
		    + "    name STRING\n"
		    + ") WITH (\n"
		    + "  'connector' = 'print'\n"
		    + ");\n"
		    + "CREATE VIEW query_view as\n" // 创建 VIEW
		    + "SELECT\n"
		    + "    *\n"
		    + "FROM source_table\n"
		    + ";\n"
		    + "INSERT INTO sink_table\n"
		    + "SELECT\n"
		    + "    *\n"
		    + "FROM query_view;";

Arrays.stream(sql.split(";"))
      .forEach(tEnv::executeSql);

SQL API 是直接通过一段 CREATE VIEW query_view as select * from source_table 来创建的 VIEW,是纯 SQL 写法。

这种创建方式是不是贼熟悉,和离线 Hive 一样。对比学习 +1。

🚀 注意:在 Table API 中的一个 Table 对象被后续的多个查询使用的场景下,Table 对象不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个 Table 的结果,小伙伴萌可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个 Table 的逻辑执行多次。类似于 with tmp as (DML) 的语法

2.7 一个 SQL 查询案例

来看看一个 SQL 查询案例。

  • 案例场景:计算每一种商品(sku_id 唯一标识)的售出个数、总销售额、平均销售额、最低价、最高价。
  • 数据准备:数据源为商品的销售流水(sku_id:商品,price:销售价格),然后写入到 Kafka 的指定 topic 当中(sku_id:商品,count_result:售出个数、sum_result:总销售额、avg_result:平均销售额、min_result:最低价、max_result:最高价)。
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode() // 声明为流任务
    //.inBatchMode() // 声明为批任务
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 1. 创建一个数据源(输入)表,这里的数据源是 flink 自带的一个随机 mock 数据的数据源。
String sourceSql = "CREATE TABLE source_table (\n"
        + "    sku_id STRING,\n"
        + "    price BIGINT\n"
        + ") WITH (\n"
        + "  'connector' = 'datagen',\n"
        + "  'rows-per-second' = '1',\n"
        + "  'fields.sku_id.length' = '1',\n"
        + "  'fields.price.min' = '1',\n"
        + "  'fields.price.max' = '1000000'\n"
        + ")";

// 2. 创建一个数据汇(输出)表,输出到 kafka 中
String sinkSql = "CREATE TABLE sink_table (\n"
        + "    sku_id STRING,\n"
        + "    count_result BIGINT,\n"
        + "    sum_result BIGINT,\n"
        + "    avg_result DOUBLE,\n"
        + "    min_result BIGINT,\n"
        + "    max_result BIGINT,\n"
        + "    PRIMARY KEY (`sku_id`) NOT ENFORCED\n"
        + ") WITH (\n"
        + "  'connector' = 'upsert-kafka',\n"
        + "  'topic' = 'tuzisir',\n"
        + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
        + "  'key.format' = 'json',\n"
        + "  'value.format' = 'json'\n"
        + ")";

// 3. 执行一段 group by 的聚合 SQL 查询
String selectWhereSql = "insert into sink_table\n"
        + "select sku_id,\n"
        + "       count(*) as count_result,\n"
        + "       sum(price) as sum_result,\n"
        + "       avg(price) as avg_result,\n"
        + "       min(price) as min_result,\n"
        + "       max(price) as max_result\n"
        + "from source_table\n"
        + "group by sku_id";

tEnv.executeSql(sourceSql);
tEnv.executeSql(sinkSql);
tEnv.executeSql(selectWhereSql);

2.8 SQL 与 DataStream API 的转换

大家会比较好奇,要写 SQL 就纯 SQL 呗,要写 DataStream 就纯 DataStream 呗,为啥还要把这两类接口做集成呢?

博主举一个案例:在 PDD 这种发补贴券的场景下,希望可以在发的补贴券总金额超过 10000 10000 10000 元时,及时报警出来,来帮助控制预算,防止发的太多。

对应的解决方案,我们可以想到使用 SQL 计算补贴券发放的结果,但是 SQL 的问题在于无法做到报警。所以我们可以将 SQL 的查询的结果(即 Table 对象)转为 DataStream,然后就可以在 DataStream 后自定义报警逻辑的算子。

我们直接上 SQL 和 DataStream API 互相转化的案例:

public static void main(String[] args) throws Exception {

    FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);

    // 1. pdd 发补贴券流水数据
    String createTableSql = "CREATE TABLE source_table (\n"
            + "    id BIGINT,\n" -- 补贴券的流水 id
            + "    money BIGINT,\n" -- 补贴券的金额
            + "    row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),\n"
            + "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"
            + ") WITH (\n"
            + "  'connector' = 'datagen',\n"
            + "  'rows-per-second' = '1',\n"
            + "  'fields.id.min' = '1',\n"
            + "  'fields.id.max' = '100000',\n"
            + "  'fields.money.min' = '1',\n"
            + "  'fields.money.max' = '100000'\n"
            + ")\n";

    // 2. 计算总计发放补贴券的金额
    String querySql = "SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, \n"
            + "      window_start, \n"
            + "      sum(money) as sum_money,\n" -- 补贴券的发放总金额
            + "      count(distinct id) as count_distinct_id\n"
            + "FROM TABLE(CUMULATE(\n"
            + "         TABLE source_table\n"
            + "         , DESCRIPTOR(row_time)\n"
            + "         , INTERVAL '5' SECOND\n"
            + "         , INTERVAL '1' DAY))\n"
            + "GROUP BY window_start, \n"
            + "        window_end";

    flinkEnv.streamTEnv().executeSql(createTableSql);

    Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql);

    // 3. 将金额结果转为 DataStream,然后自定义超过 1w 的报警逻辑
    flinkEnv.streamTEnv()
            .toDataStream(resultTable, Row.class)
            .flatMap(new FlatMapFunction<Row, Object>() {
                @Override
                public void flatMap(Row value, Collector<Object> out) throws Exception {
                    long l = Long.parseLong(String.valueOf(value.getField("sum_money")));

                    if (l > 10000L) {
                        log.info("报警,超过 1w");
                    }
                }
            });

    flinkEnv.env().execute();
}

目前在 1.13 1.13 1.13 版本中,Flink 对于 Table 和 DataStream 的转化是有一些限制的:上面的案例可以看到,Table 和 DataStream 之间的转换目前只有 StreamTableEnvironment::toDataStreamStreamTableEnvironment::fromDataStream 接口支持。

所以其实小伙伴萌可以理解为只有流任务才支持 Table 和 DataStream 之间的转换,批任务是不支持的(虽然可以使用流执行模式处理有界流 - 批数据,也就是模拟按照批执行,但效率较低,这种骚操作不建议大家搞)。

那什么时候才能支持批任务的 Table 和 DataStream 之间的转换呢? 1.14 1.14 1.14 版本支持。 1.14 1.14 1.14 版本中,流和批的都统一到了 StreamTableEnvironment 中,因此就可以做 Table 和 DataStream 的互相转换了。文章来源地址https://www.toymoban.com/news/detail-841978.html

到了这里,关于【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性

    《 Flink SQL 基础概念 》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL Table 运行环境、基本概念及常用 API Flink SQL 基础概念(二):数据类型 Flink SQL 基础概念(三):SQL 动态表 连续查询 Flink SQL 基础概念(四):SQL 的时间属性 Flink SQL 基础概念(五):SQL 时区问

    2024年04月13日
    浏览(32)
  • Flink基础概念及常识

    ​​​​​ 目录 1.flink入门 a.有边界和无边界  b.有状态 c.精确一次性 2.flink CheckPoint机制 a. 任务启动 b. 启动Checkpoint c. Source启动Checkpoint d. task 接收 barrier e. barrier对齐 f. 处理缓存数据 g. 上报Checkpoint完成 3.flink反压(背压) a. 产生原因 b. 过程 c. 影响 4. 数据倾斜 4.1 定义 4.2

    2024年02月11日
    浏览(20)
  • 大数据Flink简介与架构剖析并搭建基础运行环境

    前面我们分别介绍了大数据计算框架Hadoop与Spark,虽然他们有的有着良好的分布式文件系统和分布式计算引擎,有的有着分布式数据集和基于内存的分布式计算引擎,但是却不能对无边界数据流进行有效处理,今天我们就分享一个第四代大数据分布式计算框架Flink简介与架构剖

    2024年02月10日
    浏览(33)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(40)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(48)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(41)
  • 《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    第四章中介绍了 DataStream API 以及 DataSet API 的入门案例,本章开始介绍 Table API 以及基于此的高层应用 Flink SQL 的基础。 Flink 提供了两个关系API——Table API 和 SQL——用于统一的流和批处理。Table API 是一种针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来

    2024年02月03日
    浏览(52)
  • Flink-SQL——时态表(Temporal Table)

    这里我们需要注意一下的是虽然我们介绍的是Flink 的 Temporal Table 但是这个概念最早是在数据库中提出的 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作

    2024年01月16日
    浏览(31)
  • Flink Table API/SQL 多分支sink

    在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: 使用 StreamStatementSet. 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    浏览(90)
  • Flink-SQL——动态表 (Dynamic Table)

    SQL 和关系代数在设计时并未考虑流数据。因此,在关系代数(和 SQL)之间几乎没有概念上的差异。 本文会讨论这种差异,并介绍 Flink 如何在无界数据集上实现与数据库引擎在有界数据上的处理具有相同的语义。 下表比较了传统的关系代数和流处理与输入数据、执行和输出结果

    2024年01月17日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包