Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(13):PyFlink Tabel API之SQL DDL。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. TableEnvironment

创建 TableEnvironment
from pyflink.table import Environmentsettings, TableEnvironment


# create a streaming TableEnvironment

env_settings = Environmentsettings.in_streaming_mode()

table_env = TableEnvironment.create(env_settings)

# or create a batch TableEnvironment

env_settings = Environmentsettings.in_batch_mode()

table_env = TableEnvironment.create(env_settings)

TableEnvironment 是 Table API 和 SQL 集成的核心概念。

TableEnvironment 可以用来:

  • ·创建 Table
  • ·将 Table 注册成临时表
  • ·执行 SQL 查询
  • ·注册用户自定义的 (标量,表值,或者聚合) 函数
  • ·配置作业
  • ·管理 Python 依赖
  • ·提交作业执行
创建 source 表
table_env.execute_sql("""

    CREATE TABLE datagen (

        id INT,

        data STRING

    ) WITH (

        'connector' = 'datagen',

        'fields.id.kind' = 'sequence',

        'fields.id.start' = '1',

        'fields.id.end' = '10'

    )

""")
创建 sink 表
table_env.execute_sql("""

    CREATE TABLE print (

        id INT,

        data STRING

    ) WITH (

        'connector' = 'print'

    )

""")

2. Table

Table 是 Python Table API 的核心组件。Table 是 Table API 作业中间结果的逻辑表示。

一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。

不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。

通过列表类型的对象创建

你可以使用一个列表对象创建一张表:

from pyflink.table import Environmentsettings, TableEnvironment

# 创建 批 TableEnvironment

env_settings = Environmentsettings.in_batch_mode()

table_env = TableEnvironment.create(env_settings)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])

table.to_pandas()==>print(table.to_pandas())

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

print(table.to_pandas())
通过 DDL 创建

你可以通过 DDL 创建一张表,execute_sql(stmt) 执行指定的语句并返回执行结果。

执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。

注意,对于 "INSERT INTO" 语句,这是一个异步操作,通常在向远程集群提交作业时才需要使用。

但是,如果在本地集群或者 IDE 中执行作业时,你需要等待作业执行完成。

from pyflink.table import Environmentsettings, TableEnvironment

# 创建流 TableEnvironment

env_settings = Environmentsettings.in_streaming_mode()

table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""

    CREATE TABLE random_source (

        id BIGINT,

        data TINYINT

    ) WITH (

        'connector' = 'datagen',

        'fields.id.kind'='sequence',

        'fields.id.start'='1',

        'fields.id.end'='3',

        'fields.data.kind'='sequence',

        'fields.data.start'='4',

        'fields.data.end'='6'

    )

""")

table = table_env.from_path("random_source")

table.to_pandas()
通过 Catalog 创建
Catalog

Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。

元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。

元数据也可以是持久化的,例如 Hive Metastore 中的元数据。

Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

Catalog类型

GenericInMemoryCatalog

基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

JdbcCatalog

JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。

PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

HiveCatalog

HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。

警告 Hive Metastore 以小写形式存储所有元数据对象名称,GenericInMemoryCatalog 区分大小写。

用户自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。

想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。

这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

创建 Flink 表并将其注册到 Catalog

使用 SQL DDL

用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

from pyflink.table.catalog import HiveCatalog

# Create a HiveCatalog

catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")

# Register the catalog

t_env.register_catalog("myhive", catalog)

# Create a catalog database

t_env.execute_sql("CREATE DATABASE mydb WITH (...)")

# Create a catalog table

t_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")

# should return the tables in current catalog and database.

t_env.list_tables()

通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …",都存储在 catalog 中。

你可以通过 SQL 直接访问 catalog 中的表。

使用 Java/Scala

用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
from pyflink.table.descriptors import Kafka
settings = Environmentsettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# Create a catalog table
schema = Schema.new_builder() \
    .column("name", DataTypes.STRING()) \
    .column("age", DataTypes.INT()) \
    .build()  
catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka")
    .schema(schema)
    // …
    .build())
# tables should contain "mytable"
tables = catalog.list_tables("mydb")

TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。

Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。

如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:

from_path(path)   通过指定路径下已注册的表来创建一个表,例如通过 create_temporary_view 注册表。文章来源地址https://www.toymoban.com/news/detail-657536.html

到了这里,关于Flink流批一体计算(13):PyFlink Tabel API之SQL DDL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(61)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(46)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(38)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(44)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(39)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(41)
  • Flink流批一体计算(1):流批一体和Flink概述

    Apache Flink应运而生 数字化经济革命的浪潮正在颠覆性地改变着人类的工作方式和生活方式,数字化经济在全球经济增长中扮演着越来越重要的角色,以互联网、云计算、大数据、物联网、人工智能为代表的数字技术近几年发展迅猛,数字技术与传统产业的深度融合释放出巨大

    2024年02月10日
    浏览(42)
  • flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

    前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇! Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油! 3. 物理分区 3.1 Global Partitioner 该分区器会将所有的数据都发送到下游的某个算子实例(subta

    2024年02月19日
    浏览(39)
  • flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

    前言 :今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值

    2024年02月19日
    浏览(43)
  • Flink流批一体计算(7):Flink优化

    目录 配置内存 设置并行度 操作场景 具体设置 补充 配置进程参数 操作场景 具体配置 配置netty网络通信 操作场景 具体配置 配置内存 Flink 是依赖内存计算,计算过程中内存不够对 Flink 的执行效率影响很大。可以通过监控 GC ( Garbage Collection ),评估内存使用及剩余情况来判

    2024年02月12日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包