Flink系列Table API和SQL之:表和流的转换

这篇具有很好参考价值的文章主要介绍了Flink系列Table API和SQL之:表和流的转换。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、表和流的转换

  • 从创建表环境开始,历经表的创建、查询转换和输出,已经可以使用Table API和SQL进行完整的流处理了。不过在应用的开发过程中,我们测试业务逻辑一般不会直接将结果直接写入到外部系统,而是在本地控制台打印输出。对于DataStream非常容易,直接调用print()方法就可以看到结果数据流的内容了。但对于Table就比较悲剧,没有提供print()方法。
  • 在Flink中可以将Table再转换成DataStream,然后进行打印输出。这就涉及了表和流的转换

二、将表(Table)转换成流(DataStream)

调用toDataStream()方法

  • 将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。
Table aliceVisitTable = tableEnv.sqlQuery(
	"SELECT user,url " +
	"FROM EventTable " +
	"WHERE user = 'Alice' "
);

将表转换成数据流,这里需要将要转换的Table对象作为参数传入。

tableEnv.toDataStream(aliceVisitTable).print();

调用toChangelogStream()方法

tableEnv.createTemporaryView("clickTable",eventTable);
Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");

tableEnv.toChangelogStream(aggResult).print("agg");

三、将流转换成表

调用fromDataStream()方法

  • 想要将一个DataStream转换成表也很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。例如,可以直接将事件流eventStream转换成一个表。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

获取表环境

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

读取数据源

SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

将数据流转换成表

Table eventTable = tableEnv.fromDataStream(eventStream);

由于流中的数据本身就是定义好的POJO类型Event,所以我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应着Event中的属性。

另外,还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置。

提取Event中的timestamp和url作为表中的列

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp"),$("url"));

需要注意的是,timestamp本身是SQL中的关键字,所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的as()方法对字段进行重命名。

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp").as("ts"),$("url"));

调用createTemporaryView()方法

  • 调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换。不过如果希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图。
  • 对于这种场景,更简洁的调用方式,可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后可以传入多个参数,用来指定表中的字段:
tableEnv.createTemporaryView("EventTable",eventStream,$("timestamp").as("ts"),$("url"));

这样接下来就可以直接在SQL中引用表EventTable了。

调用fromChangelogStream()方法
表环境还提供了一个方法fromChangelogStream(),可以将一个更新日志流转换成表。这个方法要求流中的数据类型只能是Row,而且每一个数据都需要指定当前航的更新类型(RowKind)。所以一般是由连接器帮我们实现的。

四、支持的数据类型

  • DataStream,流中的数据类型都是定义好的POJO类。如果DataStream中的类型是简单的基本类型,还可以直接转换成表么?这就涉及了Table中支持的数据类型。
  • 整体来看,DataStream中支持的数据类型,Table中也都是支持的,只不过在进行转换时需要注意一些细节。

原子类型:

  • 在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称做原子类型。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。
StreamTableEnvironment tableEnv = ...;

DataStream<Long> stream = ...;

将数据流转换成动态表,动态表只有一个字段,重命名为myLong

Table table = tableEnv.fromDataStream(stream,$("myLong"));

Tuple类型

  • 当原子类型不做重命名时,默认的字段名就是"f0",容易想到,其实就是将原子类型看做了一元组Tuple1的处理结果。
  • Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元祖中元素的属性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。
StreamTableEnvironment tableEnv = ...;

DataStream<Tuple2<Long,Integer>> stream = ... ;

将数据流转换成只包含f1字段的表

Table table = tableEnv.fromDataStream(stream,$("f1"));

将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换

Table table = tableEnv.fromDataStream(stream,$("f1"),$("f0"));

将f1字段命名为myInt,f0命名为myLong

Table table = tableEnv.fromDataStream(stream,$("f1").as("myInt"),$("f0").as("myLong"));

Row类型

  • Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息。在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的模式结构(Schema)。除此之外,Row类型还附加了一个属性RowKind,用来表示当前行在更新操作中的类型。这样,Row就可以用来表示更新日志流(changelog stream)中的数据,从而架起了Flink中流和表的转换桥梁。
  • 所以在更新日志流中,元素的类型必须是Row,而且需要调用ofKind(0方法来指定更新类型。下面是一个具体的例子:
DataStream<Row> dataStream = env.fromElements(
	Row.ofKind(RowKind.INSERT,"Alice",12),
	Row.ofKind(RowKind.INSERT,"Bob",5),
	Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),
	Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100)
);

将更新日志流转换为表文章来源地址https://www.toymoban.com/news/detail-400803.html

Table table = tableEnv.fromChangelogStream(dataStream);

到了这里,关于Flink系列Table API和SQL之:表和流的转换的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 22、Flink 的table api与sql之创建表的DDL

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月12日
    浏览(43)
  • Flink系列Table API和SQL之:滚动窗口、滑动窗口、累计窗口、分组聚合

    有了时间属性,接下来就可以定义窗口进行计算了。窗口可以将无界流切割成大小有限的桶(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理的接口,而在Table API和SQL中,类似的功能也都可以实现。 在Flink 1

    2023年04月27日
    浏览(50)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(41)
  • Flink Table API/SQL 多分支sink

    在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: 使用 StreamStatementSet. 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    浏览(90)
  • 【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

    《 Flink SQL 基础概念 》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL Table 运行环境、基本概念及常用 API Flink SQL 基础概念(二):数据类型 Flink SQL 基础概念(三):SQL 动态表 连续查询 Flink SQL 基础概念(四):SQL 的时间属性 Flink SQL 基础概念(五):SQL 时区问

    2024年03月21日
    浏览(60)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(41)
  • 《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    第四章中介绍了 DataStream API 以及 DataSet API 的入门案例,本章开始介绍 Table API 以及基于此的高层应用 Flink SQL 的基础。 Flink 提供了两个关系API——Table API 和 SQL——用于统一的流和批处理。Table API 是一种针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来

    2024年02月03日
    浏览(52)
  • Flink(十三)Flink 的table api与sql的基本概念、通用api介绍及入门示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(44)
  • Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包