Flink系列之:Table API Connectors之JSON Format

这篇具有很好参考价值的文章主要介绍了Flink系列之:Table API Connectors之JSON Format。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、JSON Format

JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。

二、依赖

为了使用 Json 格式,使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL 客户端都需要以下依赖项。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.18.0</version>
</dependency>

三、创建一张基于 JSON Format 的表

以下是一个利用 Kafka 以及 JSON Format 构建表的例子。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

四、Format 参数

参数 是否必须 默认值 类型 描述
format 必选 (none) String 声明使用的格式,这里应为’json’。
json.fail-on-missing-field 可选 false Boolean 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。
json.ignore-parse-errors 可选 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
json.timestamp-format.standard 可选 ‘SQL’ String 声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为’SQL’ 以及 ‘ISO-8601’:可选参数 ‘SQL’ 将会以 “yyyy-MM-dd HH:mm:ss.s{precision}” 的格式解析 TIMESTAMP, 例如 “2020-12-30 12:13:14.123”, 以 “yyyy-MM-dd HH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30 12:13:14.123Z” 且会以相同的格式输出。可选参数 ‘ISO-8601’ 将会以 “yyyy-MM-ddTHH:mm:ss.s{precision}” 的格式解析输入 TIMESTAMP, 例如 “2020-12-30T12:13:14.123” , 以 “yyyy-MM-ddTHH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30T12:13:14.123Z” 且会以相同的格式输出。
json.map-null-key.mode 选填 ‘FAIL’ String 指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’ 和 ‘LITERAL’:Option ‘FAIL’ 将抛出异常,如果遇到 Map 中 key 值为空的数据。Option ‘DROP’ 将丢弃 Map 中 key 值为空的数据项。Option ‘LITERAL’ 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ‘json.map-null-key.literal’ 定义。
json.map-null-key.literal 选填 ‘null’ String 当 ‘json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
json.encode.decimal-as-plain-number 选填 false Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。
decode.json-parser.enabled 选填 true Boolean JsonParser 是 Jackson 提供的流式读取 JSON 数据的 API。与 JsonNode 方式相比,这种方式读取速度更快,内存消耗更少。同时,JsonParser 在读取数据时还支持嵌套字段的投影下推。该参数默认启用。如果遇到任何不兼容性问题,可以禁用并回退到 JsonNode 方式。

五、数据类型映射关系

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。

在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。

下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。文章来源地址https://www.toymoban.com/news/detail-788559.html

Flink SQL类型 JSON类型
CHAR/VARCHAR/STRING string
BOOLEAN boolean
BINARY/VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone)
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object

到了这里,关于Flink系列之:Table API Connectors之JSON Format的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

    Flink中使用表和SQL基本上跟其他场景是一样的。不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有\\\"仅插入流\\\"(Insert-Only Streams)和\\\"更新日志流\\\"(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。 这种麻烦其实是不可避

    2024年02月03日
    浏览(64)
  • Flink 系例 之 Connectors 连接 ElasticSearch

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作; 示例环境 示例数据源 (项目码云下载) Flink 系例 之 搭建开发环境与数据 示例模块 (pom.xml) Flink 系例 之 DataStream Connectors 与 示例模块 数据流输入

    2024年02月16日
    浏览(37)
  • 17、Flink 之Table API: Table API 支持的操作(1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(29)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(50)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(50)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(41)
  • Flink(十三)Flink 的table api与sql的基本概念、通用api介绍及入门示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(45)
  • 21、Flink 的table API与DataStream API 集成(完整版)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月04日
    浏览(33)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(43)
  • 《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    第四章中介绍了 DataStream API 以及 DataSet API 的入门案例,本章开始介绍 Table API 以及基于此的高层应用 Flink SQL 的基础。 Flink 提供了两个关系API——Table API 和 SQL——用于统一的流和批处理。Table API 是一种针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来

    2024年02月03日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包