使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

这篇具有很好参考价值的文章主要介绍了使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

版本说明:

SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT

引擎说明:

Flink:1.16.2

Zeta:官方自带

前言

近些时间,我们正好接手一个数据集成项目,数据上游方是给我们投递到Kafka,我们一开始的技术选型是SpringBoot+Flink对上游数据进行加工处理(下文简称:方案一),由于测试不到位,后来到线上,发现数据写入效率完全不符合预期。后来将目光转到开源项目SeaTunnel上面,发现Source支持Kafka,于是开始研究测试,开发环境测试了500w+数据,发现效率在10000/s左右。果断放弃方案一,采取SeaTunnel对数据进行集成加工(下文简称:方案二)。在SeaTunnel研究的过程中,总结了两种方法,方法二相较于方法一,可以实现全场景使用,无需担心字段值里面各种意想不到的字符对数据落地造成错位现象的发生。

对比

使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

在方案二的基础上又衍生出两种方法
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

所以,在经过长时间的探索和我们线上验证得出结论,建议使用方案二的方法二。

好了,我们进入正文,主篇幅主要介绍方案二中的两种方法,让大家主观的感受SeaTunnel的神奇。

方案一 Springboot+Flink实现Kafka 复杂JSON的解析

网上案例很多,在此不做过多介绍。

方案二 SeaTunnel实现Kafka复杂JSON的解析

在开始介绍之前,我们看一下我们上游方投递Kafka的Json样例数据(对部分数据进行了敏感处理),如下:

    "magic": "a***G",
    "type": "D***",
    "headers": null,
    "messageSchemaId": null,
    "messageSchema": null,
    "message": {
        "data": {
            "LSH": "187eb13****l0214723",
            "NSRSBH": "9134****XERG56",
            "QMYC": "01*****135468",
            "QMZ": "1Zr*****UYGy%2F5bOWtrh",
            "QM_SJ": "2023-05-05 16:42:10.000000",
            "YX_BZ": "Y",
            "ZGHQ_BZ": "Y",
            "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
            "SKSSQ": 202304,
            "SWJG_DM": "",
            "SWRY_DM": "00",
            "CZSJ": "2023-05-05 16:42:10.000000",
            "YNSRSBH": "9134****XERG56",
            "SJTBSJ": "2023-06-26 19:29:59.000",
            "SJCZBS": "I"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "12440977",
            "timestamp": "2023-06-26T19:29:59.673000",
            "streamPosition": "00****3.16",
            "transactionId": "000***0006B0002",
            "changeMask": "0FFF***FF",
            "columnMask": "0F***FFFF",
            "transactionEventCounter": 1,
            "transactionLastEvent": false
        }
    }
}

方法一、不通过UDF函数实现

存在问题:字段值存在分隔符,例如‘,’ 则数据在落地的时候会发生错位现象。

该方法主要使用官网 transform-v2的各种转换插件进行实现,主要用到的插件有 ReplaceSplit以及Sql实现

ST脚本:(ybjc_qrqm.conf)

env {
  execution.parallelism = 100
  job.mode = "STREAMING"
  job.name = "kafka2mysql_ybjc"
  execution.checkpoint.interval = 60000
}
 
source {
  Kafka {
    result_table_name = "DZFP_***_QRQM1"
    topic = "DZFP_***_QRQM"
    bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
    schema = {
      fields {
        message =  {
            data = {
                LSH = "string",
                NSRSBH =  "string",
                QMYC =  "string",
                QMZ =  "string",
                QM_SJ =  "string",
                YX_BZ =  "string",
                ZGHQ_BZ =  "string",
                ZGHQ_SJ =  "string",
                SKSSQ =  "string",
                SWJG_DM = "string",
                SWRY_DM = "string",
                CZSJ = "string",
                YNSRSBH = "string",
                SJTBSJ = "string",
                SJCZBS = "string"
            }
        }
      }  
    }
	  start_mode = "earliest"
    #start_mode.offsets = {
    #     0 = 0
    #     1 = 0
    #     2 = 0
    #}	
    kafka.config = {
      auto.offset.reset = "earliest"
      enable.auto.commit = "true"
	    # max.poll.interval.ms = 30000000
	    max.partition.fetch.bytes = "5242880"
	    session.timeout.ms = "30000"
      max.poll.records = "100000"
    }
  }
}

transform {
 Replace {
    source_table_name = "DZFP_***_QRQM1"
    result_table_name = "DZFP_***_QRQM2"
    replace_field = "message"
    pattern = "[["
    replacement = ""
    #is_regex = true
	  #replace_first = true
  }
  Replace {
    source_table_name = "DZFP_***_QRQM2"
    result_table_name = "DZFP_***_QRQM3"
    replace_field = "message"
    pattern = "]]"
    replacement = ""
    #is_regex = true
	  #replace_first = true
  }
  
  Split {
    source_table_name = "DZFP_***_QRQM3"
    result_table_name = "DZFP_***_QRQM4"
    # 存在问题: 如果字段值存在分隔符 separator,则数据会错位
    separator = ","
    split_field = "message"
    # 你的第一个字段包含在zwf5里面,,前五个占位符是固定的。
    output_fields = [zwf1,zwf2,zwf3,zwf4,zwf5,nsrsbh,qmyc,qmz,qm_sj,yx_bz,zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,sjczbs]
  }
  
  sql{
   source_table_name = "DZFP_***_QRQM4"
   query = "select replace(zwf5 ,'fields=[','') as lsh,nsrsbh,trim(qmyc) as qmyc,qmz,qm_sj,yx_bz, zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,replace(sjczbs,']}]}','') as sjczbs  from DZFP_DZDZ_QRPT_YWRZ_QRQM4 where skssq <> ' null'"
   result_table_name = "DZFP_***_QRQM5"
  }
  
}

sink {
	Console {
		source_table_name = "DZFP_***_QRQM5"
	}
	
    jdbc {
	    source_table_name = "DZFP_***_QRQM5"
        url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "pwd"
        batch_size = 200000
        database = "dbname"
        table = "tablename"
				generate_sink_sql = true
        primary_keys = ["nsrsbh","skssq"]
    } 
}

正常写入数据是可以写入了。

写入成功如下:

● kafka源数据:
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例
● tidb目标数据:
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例
现在我们模拟给kafka发送一条数据,其中,SJTBSJ字段我在中间设置一个, 是逗号。

原始值:2023-06-26 19:29:59.000 更改之后的值2023-06-26 19:29:59.0,00

往topic生产一条数据命令
kafka-console-producer.sh --topic DZFP_***_QRQM --broker-list centos1:19092,centos2:19092,centos3:19092

发送如下:

    "magic": "a***G",
    "type": "D***",
    "headers": null,
    "messageSchemaId": null,
    "messageSchema": null,
    "message": {
        "data": {
            "LSH": "187eb13****l0214723",
            "NSRSBH": "9134****XERG56",
            "QMYC": "01*****135468",
            "QMZ": "1Zr*****UYGy%2F5bOWtrh",
            "QM_SJ": "2023-05-05 16:42:10.000000",
            "YX_BZ": "Y",
            "ZGHQ_BZ": "Y",
            "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
            "SKSSQ": 202304,
            "SWJG_DM": "",
            "SWRY_DM": "00",
            "CZSJ": "2023-05-05 16:42:10.000000",
            "YNSRSBH": "9134****XERG56",
            "SJTBSJ": "2023-06-26 19:29:59.0,00",
            "SJCZBS": "I"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "12440977",
            "timestamp": "2023-06-26T19:29:59.673000",
            "streamPosition": "00****3.16",
            "transactionId": "000***0006B0002",
            "changeMask": "0FFF***FF",
            "columnMask": "0F***FFFF",
            "transactionEventCounter": 1,
            "transactionLastEvent": false
        }
    }
}

写入之后,发现数据错位了。
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

结论:其实这个问题线上还是能遇到的,比如地址字段里面含有逗号,备注信息里面含有逗号等等,这种现象是不可避免的,所以此种方案直接pass。对数据危害性极大!可以处理简单的数据,当做一种思路。

方法二:通过UDF函数实现

该方法通过UDF函数扩展(https://seatunnel.apache.org/docs/2.3.2/transform-v2/sql-udf)的方式,实现嵌套kafka source json源数据的解析。可以大大简化ST脚本的配置

ST脚本:(ybjc_qrqm_yh.conf)

env {
    execution.parallelism = 5
    job.mode = "STREAMING"
    job.name = "kafka2mysql_ybjc_yh"
    execution.checkpoint.interval = 60000
}

source {
    Kafka {
        result_table_name = "DZFP_***_QRQM1"
        topic = "DZFP_***_QRQM"
        bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
        schema = {
        fields {
             message =  {
                data = "map<string,string>"
             }
        }  
        }
        start_mode = "earliest"
        #start_mode.offsets = {
        #     0 = 0
        #     1 = 0
        #     2 = 0
        #}	
        kafka.config = {
        auto.offset.reset = "earliest"
        enable.auto.commit = "true"
        # max.poll.interval.ms = 30000000
        max.partition.fetch.bytes = "5242880"
        session.timeout.ms = "30000"
        max.poll.records = "100000"
        }
    }
}
transform {
    sql{
        source_table_name = "DZFP_***_QRQM1"
        result_table_name = "DZFP_***_QRQM2"
        # 这里的qdmx就是我自定义的UDF函数,具体实现下文详细讲解。。。
        query = "select qdmx(message,'lsh') as lsh,qdmx(message,'nsrsbh') as nsrsbh,qdmx(message,'qmyc') as qmyc,qdmx(message,'qmz') as qmz,qdmx(message,'qm_sj') as qm_sj,qdmx(message,'yx_bz') as yx_bz,qdmx(message,'zghq_bz') as zghq_bz,qdmx(message,'zghq_sj') as zghq_sj,qdmx(message,'skssq') as skssq,qdmx(message,'swjg_dm') as swjg_dm,qdmx(message,'swry_dm') as swry_dm,qdmx(message,'czsj') as czsj,qdmx(message,'ynsrsbh') as ynsrsbh, qdmx(message,'sjtbsj') as sjtbsj,qdmx(message,'sjczbs') as sjczbs  from  DZFP_DZDZ_QRPT_YWRZ_QRQM1"
    }
}

    
sink {
    Console {
        source_table_name = "DZFP_***_QRQM2"
    }
    jdbc {
		  source_table_name = "DZFP_***_QRQM2"
		  url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
		  driver = "com.mysql.cj.jdbc.Driver"
		  user = "user"
		  password = "pwd"
		  batch_size = 200000
		  database = "dbname"
		  table = "tablename"
		  generate_sink_sql = true
		  primary_keys = ["nsrsbh","skssq"]
	} 
}

执行脚本:查看结果,发现并没有错位,还在原来的字段(sjtbsj)上面。

这种方法,是通过key获取value值。不会出现方法一中的按照逗号分割出现数据错位现象。
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

具体UDF函数编写如下。

maven引入如下:

<dependencies>
  <dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-transforms-v2</artifactId>
    <version>2.3.2</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-api</artifactId>
    <version>2.3.2</version>
  </dependency>

  <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.20</version>
  </dependency>

  <dependency>
    <groupId>com.google.auto.service</groupId>
    <artifactId>auto-service-annotations</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
    <scope>compile</scope>
  </dependency>

  <dependency>
    <groupId>com.google.auto.service</groupId>
    <artifactId>auto-service</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
    <scope>compile</scope>
  </dependency>
</dependencies>  

UDF具体实现java代码如下:

package org.seatunnel.sqlUDF;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@AutoService(ZetaUDF.class)
public class QdmxUDF implements ZetaUDF {

    @Override
    public String functionName() {
        return "QDMX";
    }

    @Override
    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> list) {
        return BasicType.STRING_TYPE;
    }

    // list 参数实例:(也就是kafka 解析过来的数据)
    //SeaTunnelRow{tableId=, kind=+I, fields=[{key1=value1,key2=value2,.....}]}
    @Override
    public Object evaluate(List<Object> list) {
        String str = list.get(0).toString();
        //1 Remove the prefix
        str = StrUtil.replace(str, "SeaTunnelRow{tableId=, kind=+I, fields=[{", "");
        //2 Remove the suffix
        str = StrUtil.sub(str, -3, 0);
        // 3 build Map key value
        Map<String, String> map = parseToMap(str);
        if ("null".equals(map.get(list.get(1).toString())))
            return "";
        // 4 return the value of the key
        return map.get(list.get(1).toString());
    }

    public static Map<String, String> parseToMap(String input) {
        Map<String, String> map = new HashMap<>();
        // 去除大括号 在字符串阶段去除
        // input = input.replaceAll("[{}]", "");
        // 拆分键值对
        String[] pairs = input.split(", ");

        for (String pair : pairs) {
            String[] keyValue = pair.split("=");
            if (keyValue.length == 2) {
                String key = keyValue[0].trim().toLowerCase();
                String value = keyValue[1].trim();
                map.put(key, value);
            }
        }
        return map;
    }
}

然后打包,打包命令如下:

mvn -T 8 clean install -DskipTests -Dcheckstyle.skip -Dmaven.javadoc.skip=true

查看META-INF/services, 看注解@AutoService 是否生成对应的spi接口:

如下:则打包成功!
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例
如果没有,则打包失败,UDF函数无法使用.

可以参考我的打包插件:

  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-site-plugin</artifactId>
      <version>3.7</version>
      <dependencies>
        <dependency>
          <groupId>org.apache.maven.doxia</groupId>
          <artifactId>doxia-site-renderer</artifactId>
          <version>1.8</version>
        </dependency>
      </dependencies>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
        <annotationProcessorPaths>
          <path>
            <groupId>com.google.auto.service</groupId>
            <artifactId>auto-service</artifactId>
            <version>1.1.1</version>
          </path>
        </annotationProcessorPaths>
      </configuration>
    </plugin>
  </plugins>
</build>

最终打成的jar包放到 ${SEATUNNEL_HOME}/lib目录下,由于我的UDF函数引入了第三方jar包,也需要一并上传。如果是Zeta集群,需要重启Zeta集群才能生效。其他引擎实时生效。
最终上传成功如下:
使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例
说明:这个hutool-all的jar包可以含在java_study这个项目里面,我图方便,上传了两个。

综上,推荐使用通过UDF函数扩展的方式,实现嵌套kafka source json源数据的解析。

本文由 白鲸开源 提供发布支持!文章来源地址https://www.toymoban.com/news/detail-582817.html

到了这里,关于使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 代立冬:基于Apache Doris+SeaTunnel 实现多源实时数据仓库解决方案探索实践

    大家好,我是白鲸开源的联合创始人代立冬,同时担任 Apache DolphinScheduler 的 PMC chair 和 SeaTunnel 的 PMC。作为 Apache Foundation 的成员和孵化器导师,我积极参与推动多个开源项目的发展,帮助它们通过孵化器成长为 Apache 的顶级项目。 今天的分享的主题其实还是从开源到商业,

    2024年02月04日
    浏览(65)
  • Java解析JSON复杂数据的第三种思路

    本文介绍了Java解析JSON复杂数据的第三种思路,通过改变思路,按照新的流程获取数据。文章详细解析了接口JSON数据,并提供了JSON转XML数据

    2024年01月25日
    浏览(64)
  • 探索在Apache SeaTunnel上使用Hudi连接器,高效管理大数据的技术

    Apache Hudi是一个数据湖处理框架,通过提供简单的方式来进行数据的插入、更新和删除操作,Hudi能够帮助数据工程师和科学家更高效地处理大数据,并支持实时查询。 Spark Flink SeaTunnel Zeta 批处理 流处理 精确一次性 列投影 并行处理 支持用户自定义切分 Hudi Source 连接器专为从

    2024年04月28日
    浏览(51)
  • 没有fastjson,rust怎么方便的解析提取复杂json呢?

    在 Rust 中解析和提取复杂的 JSON 结构,你可以使用 serde_json 库来处理。 serde_json 提供了一组功能强大的方法来解析和操作 JSON 数据。 下面是一个示例,展示了如何解析和提取复杂的 JSON 结构: 在这个示例中,我们首先将 JSON 字符串解析为 Value 类型的对象。然后,我们使用

    2024年02月14日
    浏览(45)
  • seatunnel hive source 未设置分隔符导致多个字段合并成一个的问题定位解决

    seatunnel hive source 未设置分隔符导致多个字段没有切分全保存在一个字段中了,翻看源码发现分隔符是是通过delimiter设置的,只要设置这个delimiter=\\\",\\\"就可以了。 设置这个属性 delimiter=“,” 他的默认值是u0001,如果没有设置delimiter属性则会根据文件类型判断,如果是csv则使用”,”

    2024年02月16日
    浏览(51)
  • 【mybatis-plus实体类复杂对象字段json自动相互转换,以及自定义字段类型解析器】

    引言: mybatis-plus集合对象字段json如何自动进行相互的转换? 怎样在使用mybatis-plus操作数据表的时候自动对实体类属性进行自动解析? 我们平时在做开发的时候,会遇到一个字段保存json串的情况。一般情况下mybatis-plus在做插入/更新之前将对象手动转换成json串,查询要用的时

    2023年04月13日
    浏览(103)
  • 01 - Apache Seatunnel 源码调试

    选择 seatunnel-examples ├── seatunnel-engine-examples ├── seatunnel-flink-connector-v2-example ├── seatunnel-spark-connector-v2-example 注意:需要调试哪些数据库,使用相应 connector ,就要在 pom 文件中添加。 如果需要其他的,自己补充即可

    2024年02月09日
    浏览(44)
  • # Apache SeaTunnel 究竟是什么?

    作者 | Shawn Gordon 翻译 | Debra Chen 原文链接 | What the Heck is Apache SeaTunnel? 我在2023年初开始注意到Apache SeaTunnel的相关讨论,一直低调地关注着。该项目始于2017年,最初名为Waterdrop,在Apache DolphinScheduler的创建者的贡献下发展起来,后者支持SeaTunnel作为任务插件。 我最初对于SeaT

    2024年04月08日
    浏览(103)
  • Apache SeaTunnel 社区 3 月月报

    各位热爱 SeaTunnel 的小伙伴们,SeaTunnel 社区 3 月月报来啦!这里将记录 SeaTunnel 社区每个月的重要更新,并评选出月度之星,欢迎关注。 感谢以下小伙伴 3 月为 Apache SeaTunnel 做的精彩贡献(排名不分先后): @Carl-Zhou-CN,@ilsl1007,@loveyang1990,@dailai,@liugddx,@CheneyYin,@litiliu,@ShaunWuu,@

    2024年04月11日
    浏览(38)
  • apache seatunnel web 安装部署

    apache-seatunnel-2.3.3-bin.tar.gz apache-seatunnel-web-1.0.0-bin.tar.gz download_datasource.sh 解压文件

    2024年01月18日
    浏览(68)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包