Flink:FlinkSql解析嵌套Json

这篇具有很好参考价值的文章主要介绍了Flink:FlinkSql解析嵌套Json。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下

1,数据是网上摘抄,但包含里常用的大部分格式

{
    "afterColumns": {
        "created": "1589186680",
        "extra": {
            "canGiving": false
        },
        "parameter": [1, 2, 3, 4]
    },
    "beforeColumns": null,
    "tableVersion": {
        "binlogFile": null,
        "binlogPosition": 0,
        "version": 0
    },
    "touchTime": 1589186680591,
    "type":3,
    "arr": [{
        "address": "北京市海淀区",
        "city": "beijing"
    }, {
        "address": "北京市海淀区",
        "city": "beijing"
    }, {
        "address": "北京市海淀区",
        "city": "beijing"
    }]
}

注意:

Json 中的每个 {} 都需要用 Row 类型来表示
Json 中的每个 [] 都需要用 Arrary 类型来表示
数组的下标是从 1 开始的不是 0 
查询select时,关键字需要加反引号 如上面 SQL 中的 `type`
select 语句中的字段类型和顺序一定要和结果表的字段类型和顺序保持一致

因此:FlinkSql语句应该为

CREATE TABLE kafka_source (
    afterColumns ROW(created STRING,extra ROW(canGiving BOOLEAN),`parameter` ARRAY <INT>) ,
    beforeColumns STRING ,
    tableVersion ROW(binlogFile STRING,binlogPosition INT ,version INT) ,
    touchTime bigint, 
    `type` INT,
    arr ARRAY<ROW<address STRING,city STRING>>
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'test',  -- kafka topic
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json',  -- 数据源格式为 json
    'json.fail-on-missing-field' = 'true', -- 字段丢失任务不失败
    'json.ignore-parse-errors' = 'false'  -- 解析失败跳过
)

2,数据格式以及对应的flinksql格式Flink:FlinkSql解析嵌套Json

 

引自:https://blog.csdn.net/qq_21383435/article/details/124889251文章来源地址https://www.toymoban.com/news/detail-408057.html

到了这里,关于Flink:FlinkSql解析嵌套Json的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(39)
  • flink学习35:flinkSQL查询mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable {   def main(args: Array[String]): Unit = {     //create env     val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    浏览(37)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(28)
  • 【Flink】FlinkSQL中执行计划以及如何用代码看执行计划

    FilnkSQL怎么查询优化 Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如: • 基于 Apache Calcite 的子查询解相关 • 投影剪裁 • 分区剪裁 • 过滤器下推 • 子计划消除重复数据以避免重复计算 • 特殊子查询重写,包括两部

    2023年04月11日
    浏览(40)
  • 【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

    问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    浏览(24)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(31)
  • FlinkAPI开发之FlinkSQL

    这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。 如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖: 对于Flink这样的流处理框架来说,数据流和表在结构上

    2024年01月25日
    浏览(22)
  • 袋鼠云的FlinkSQL插件开发

    袋鼠云是一家大数据产品供应商。他开发了一个产品叫做 flinkStreamSQL。这东西是以 Flink 为基础开发的使用 SQL 来写流式计算逻辑的产品。 FlinkStreamSQL 的开源地址 这里所说的插件是可以理解为自定义的语法。例如下面的 SQL: dim_shop 可能是一个 redis 为实体的 Table ,这袋鼠已经

    2024年02月06日
    浏览(28)
  • 深入理解flinksql执行流程,calcite与catalog相关概念,扩展解析器实现语法的扩展

    flink在执行sql语句时,是无法像java/scala代码一样直接去使用的,需要解析成电脑可以执行的语言,对sql语句进行解析转化。 这里说的我感觉其实不是特别准确,应该是 flink使用的是一款开源SQL解析工具Apache Calcite ,Calcite使用Java CC对sql语句进行了解析 。 那么我们先来简单说

    2024年02月21日
    浏览(31)
  • calcite在flink中的二次开发,介绍解析器与优化器

    关于calcite的概念相关的内容,在我另一篇帖子 深入理解flinksql执行流程,扩展解析器实现语法的扩展 首先阐述一下 codegen: Codegen是基于ObjectWeb ASM的低开销的java代码生成器,他可以根据预先填好的规则与条件,通过编译代码,自动生成java类 在递归调用各个节点 DataStreamRel 的

    2024年02月22日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包