【flink sql】创建表

这篇具有很好参考价值的文章主要介绍了【flink sql】创建表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink sql创建表语法

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]
   
<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

示例:

create table kafka_t_trade_order_area_service (
    orderId string,
    orderStatus string,
    orderDate string,
    totalCashAmt decimal(16,2),
    totalTicketAmt decimal(16,2),
    orderamt as totalCashAmt + totalTicketAmt,  -- 计算列
    tmCreate string,
    tmSmp timestamp(3),
    `offset` bigint metadata from 'offset',  -- 元数据列
    `timestamp` TIMESTAMP_LTZ(3) METADATA   -- 元数据列,如果列名和元数据名一致,可以省略from
    -- `timestamp` bigint METADATA   -- 元数据列,可以设置其他数据类型,会强制数据类型转换
    proc_time as PROCTIME(),  -- 计算列
    watermark for tmSmp as tmSmp - interval  '20' minute  -- watermark列
) with ( 
    'connector'='kafka',
    'topic'='test_topic', 
    'properties.bootstrap.servers' = 'master.fuyun:9092',
    'properties.group.id' = 'test_fuyun',
    'format' = 'json',
    'scan.startup.mode' = 'timestamp',
    'scan.startup.timestamp-millis'='1679328000000'
    -- 'scan.startup.mode' = 'latest-offset'
);

physical_column_definition:

物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效负载。
可以在物理列之间声明其他类型的列,但不会影响最终的物理架构。

metadata_column_definition:

元数据列是SQL标准的扩展,允许访问连接器或格式化表的每一行的特定字段。元数据列由元数据关键字指示。
例如,元数据列可用于从 Kafka 记录读取和写入时间戳,以便进行基于时间的操作。
连接器和格式文档列出了每个组件的可用元数据字段。元数据列是可选的。

computed_column_definition:

计算列计算可引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不以物理方式存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。
计划器将在源之后将计算列转换为常规投影。对于优化或水印策略下推,评估可能会分布在运算符之间、多次执行或在给定查询不需要时跳过。
表达式可以包含列、常量或函数的任意组合。表达式不能包含子查询。

watermark_definition:

WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
rowtime_column_name把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark_strategy_expression定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。
Flink 提供了几种常用的 watermark 策略。

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

    • 发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。
  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    • 发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。
  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

    • 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND是一个 5 秒延迟的 watermark 策略。

PRIMARY KEY:

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。
主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。\

  • 有效性检查
    • SQL 标准主键限制可以有两种模式:ENFORCED或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED模式,即不做检查,用户需要自己保证唯一性。
    • Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。

注意:CREATE TABLE语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink,则将会为每个分区创建一个目录。

WITH Options

表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。

表达式 key1=val1的键和值必须为字符串文本常量。请参考 连接外部系统 了解不同连接器所支持的属性。

注意: 表名可以为以下三种格式 1. catalog_name.db_name.table_name2. db_name.table_name3. table_name
使用catalog_name.db_name.table_name的表将会与名为 “catalog_name” 的 catalog 和名为 “db_name” 的数据库一起注册到 metastore 中。使用 db_name.table_name的表将会被注册到当前执行的 table environment中的 catalog 且数据库会被命名为 “db_name”;对于 table_name, 数据表将会被注册到当前正在运行的catalog和数据库中。

注意: 使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前,我们无法决定其实际用于 source 抑或是 sink。

LIKE

LIKE 子句来源于两种 SQL 特性的变体/组合(Feature T171,“表定义中的 LIKE 语法” 和 Feature T173,“表定义中的 LIKE 语法扩展”)。LIKE 子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。与 SQL 标准相反,LIKE 子句必须在 CREATE 语句中定义,并且是基于 CREATE 语句的更上层定义,这是因为 LIKE 子句可以用于定义表的多个部分,而不仅仅是 schema 部分。

你可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,例如可以向 Apache Hive 中定义的表添加 watermark 定义。

示例如下:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 添加 watermark 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改写 startup-mode 属性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

结果表 Orders_with_watermark 等效于使用以下语句创建的表:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

表属性的合并逻辑可以用 like options 来控制。

可以控制合并的表属性如下:

  • CONSTRAINTS - 主键和唯一键约束
  • GENERATED - 计算列
  • OPTIONS - 连接器信息、格式化方式等配置项
  • PARTITIONS - 表分区信息
  • WATERMARKS - watermark 定义

并且有三种不同的表属性合并策略:

  • INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,例如新表和源表存在相同 key 的属性。
  • EXCLUDING - 新表不包含源表指定的任何表属性。
  • OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,例如,两个表中都存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。

并且你可以使用 INCLUDING/EXCLUDING ALL这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 属性才会被包含进新表。

示例如下:

-- 存储在文件系统的源表
CREATE TABLE Orders_in_file (
    `user` BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
    
)
PARTITIONED BY (`user`) 
WITH ( 
    'connector' = 'filesystem',
    'path' = '...'
);

-- 对应存储在 kafka 的源表
CREATE TABLE Orders_in_kafka (
    -- 添加 watermark 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    ...
)
LIKE Orders_in_file (
    -- 排除需要生成 watermark 的计算列之外的所有内容。
    -- 去除不适用于 kafka 的所有分区和文件系统的相关属性。
    EXCLUDING ALL
    INCLUDING GENERATED
);

如果未提供 like 配置项(like options),默认将使用 INCLUDING ALL OVERWRITINGOPTIONS 的合并策略。

注意: 您无法选择物理列的合并策略,当物理列进行合并时就如使用了 INCLUDING 策略。

注意: 源表 source_table 可以是一个组合 ID。您可以指定不同 catalog 或者 DB 的表作为源表: 例如,my_catalog.my_db.MyTable指定了源表 MyTable 来源于名为 MyCatalog 的 catalog 和名为 my_db 的 DB ,my_db.MyTable指定了源表 MyTable 来源于当前 catalog 和名为 my_db 的 DB。文章来源地址https://www.toymoban.com/news/detail-495960.html

到了这里,关于【flink sql】创建表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 解决npm报错Error:EEXIST: file already exists, mkdir “文件路径“,yarn create vite-app 报文件名、目录名或卷标语法不正确

    我这里出现错误是因为在配置npm命令目录与npm全局安装位置时目录创建失败,但是在执行 命令之后在本地的\\\" .yarnrc \\\"文件中 \\\" global-folder \\\" 属性被写入,之后会卡在 这个问题 在c盘的对应用户的文件夹下找到 .yarnrc 文件,打开它,将其配置为正确的路径即可 我这里是因为yarn的

    2024年02月08日
    浏览(64)
  • 【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

    《 Flink SQL 基础概念 》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL Table 运行环境、基本概念及常用 API Flink SQL 基础概念(二):数据类型 Flink SQL 基础概念(三):SQL 动态表 连续查询 Flink SQL 基础概念(四):SQL 的时间属性 Flink SQL 基础概念(五):SQL 时区问

    2024年03月21日
    浏览(72)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(52)
  • 《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    第四章中介绍了 DataStream API 以及 DataSet API 的入门案例,本章开始介绍 Table API 以及基于此的高层应用 Flink SQL 的基础。 Flink 提供了两个关系API——Table API 和 SQL——用于统一的流和批处理。Table API 是一种针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来

    2024年02月03日
    浏览(66)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(53)
  • flink1.14 sql基础语法(二) flink sql表定义详解

    每一个表的标识由 3 部分组成: catalog name (常用于标识不同的“源”,比如 hive catalog,inner catalog 等) database name(通常语义中的“库”) table name (通常语义中的“表”) 1个flinksql程序在运行时,tableEnvironment 通过持有一个 map 结构来记录所注册的 catalog; Flinksql中的表,可以是 vi

    2024年02月14日
    浏览(40)
  • flink1.14 sql基础语法(一) flink sql表查询详解

    语法示例: flink从1.13开始,提供了时间窗口聚合计算的TVF语法。 表值函数的使用约束: (1)在窗口上做 分组聚合 ,必须带上window_start 和 window_end 作为分组的key; (2)在窗口上做 topn计算 ,必须带上window_start 和 window_end 作为partition的key; (3)带条件的 join ,必须包含

    2024年02月06日
    浏览(42)
  • Flink-SQL——动态表 (Dynamic Table)

    SQL 和关系代数在设计时并未考虑流数据。因此,在关系代数(和 SQL)之间几乎没有概念上的差异。 本文会讨论这种差异,并介绍 Flink 如何在无界数据集上实现与数据库引擎在有界数据上的处理具有相同的语义。 下表比较了传统的关系代数和流处理与输入数据、执行和输出结果

    2024年01月17日
    浏览(47)
  • Flink-SQL——时态表(Temporal Table)

    这里我们需要注意一下的是虽然我们介绍的是Flink 的 Temporal Table 但是这个概念最早是在数据库中提出的 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作

    2024年01月16日
    浏览(50)
  • Flink Table API/SQL 多分支sink

    在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: 使用 StreamStatementSet. 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    浏览(97)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包