为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

这篇具有很好参考价值的文章主要介绍了为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

每遇到一个问题,在经过努力研究明白之后,总想写点东西记录。怎奈又没这个好习惯,过了一两天这个激情就没了,想写也写不出来了。最近在做一个flink-cdc采集数据的测试和产品化开发,遇到一个数据转换的问题,折腾了我两个早上,有些心血来潮,就记录一下吧,对我是一种收获,也希望能帮到哪天像我一样遇到这个问题的同学

开始新建一张MySQL表:products

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

插入一些数据:

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

 搬过来官网的示例代码

  public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("...")
                .port(...)
                .databaseList("...") // set captured database
                .tableList("products") // set captured table
                .username("...")
                .password("...")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }

 Run as 看打印结果,嗯? 怎么weight字段的值变这样了

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

 开始上网查资料,各种搜索,居然找不到,这也是为什么想写篇文章记录一下的原因。

没办法,只能看源码,查了一下资料,各种自定义解码器的文章倒是挺多,但是看了一下官方提供的json解码器JsonDebeziumDeserializationSchema,那些自定义的都是一坨屎,明明有个很牛逼的不知道用好,还有人要重新自定义JSON解码器,然而又写的稀巴烂,还各种抄来抄去。

从JsonDebeziumDeserializationSchema类进去,经过多层方法调用最终看到将Object转成JSON的方法叫convertToJson

/**
     * Convert this object, in the org.apache.kafka.connect.data format, into a JSON object, returning both the schema
     * and the converted object.
     */
    private JsonNode convertToJson(Schema schema, Object value) {
        //源码略
    }

断点调试到这个方法中,可以看到有个LogicalTypeConverter不为空

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

于是再进这个LOGICAL_CONVERTERS看一下 ,这里面单独定义了Decimal、Date、Time等类型的处理逻辑,其他简单类型的处理放在了TO_CONNECT_CONVERTERS中。继续断点调试可以看到Dicimal类型直接就进了BASE64的case

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

而这个类型是通过参数JsonConverterConfig传进来的,往回看,可以看到这个参数是解码器初始化的时候被实例化的,而且还接受了自定义参数customConverterConfigs

    /** Initialize {@link JsonConverter} with given configs. */
    private void initializeJsonConverter() {
        jsonConverter = new JsonConverter();
        final HashMap<String, Object> configs = new HashMap<>(2);
        configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
        configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
        if (customConverterConfigs != null) {
            configs.putAll(customConverterConfigs);
        }
        jsonConverter.configure(configs);
    }

 那么问题就好办了,指定Decimal的格式为NUMERIC不就可以了,于是自定义一个customConverterConfigs

  public static void main(String[] args) throws Exception {
        Map config = new HashMap();
        config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
        JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("...")
                .port(...)
                .databaseList("...") // set captured database
                .tableList("products") // set captured table
                .username("...")
                .password("...")
                .deserializer(jdd)
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }

 在Run as 看打印结果,这下Happy了

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

 可以看到已经变成想要的结果了。

后来进入产品化开发,打包到flink集群测试执行,采集oracle的时候总是同步不成功,同步成功了,也有一些字段的值变成空了,继续看了一下原表字段类型,才明白还是数值类型的问题,主键是number类型,同步之后类型还是被编码成BASE64,数仓的表是数字类型,不能插入。于是还得再测试验证一下,创建一张有各种number类型字段的表

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

简单插入两条数据

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

读取方法 

 public static void main(String[] args) throws Exception {
        Map config = new HashMap();
        config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
        JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);

        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname("...")
                .port(...)
                .database("...") // monitor XE database
                .schemaList("...") // monitor inventory schema
                .tableList("...") // monitor products table
                .username("...")
                .password("...")
                .deserializer(jdd) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print()
                .setParallelism(1)
        ; // use parallelism 1 for sink to keep message ordering

        env.execute();
    }

 看执行输出

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

更奇怪了,又去对JsonDebeziumDeserializationSchema一通研究,发现number类型,没设置p、s时,Flink-CDC读取的记录中就已经变成了Struct类型,通过解码器解决不了问题了。但是看到解码器可以通过设置includeSchema=true返回Schema,那是不是可以根据返回的Schema 的type判断是数值类型在取出value来Base64解码.....! 但是,类似这样想法的代码写法,在项目中实在是太多了,我不想也沦为这种垃圾代码的制造者。

继续查一下吧,是这个提问(请教大佬们,oracle cdc的NUMBER类型,打印出来为什么变成字符串了呢,怎么转换回去?-问答-阿里云开发者社区-阿里云)启发了我,虽然回答中那个参数试了一下无效,但是我找到了正确答案,后来还再次回答了这个问题

于是到Flink-CDC官网(Oracle CDC Connector — CDC Connectors for Apache Flink® documentation)看看有没有什么参数可以控制这个类型的转换

可以看到,Flink-CDC提供的参数都比较简单,于是再去debezium看看

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

到debezium网站找啊找啊找,找到了一个神奇的和我想像中一样的参数,它叫decimal.handling.mode

 为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

可以看到这个参数默认值 是precise,还有其他两个可选值double和string

看看说明,简单理解一下

precise 以java中的精确类型来表示值

double 使用比较容易,但是会造成精度损失

string  也比较容易使用,但是会造成字段语意信息丢失

那么我直接整个string不就完美了,转成json就只有值了,我管你什么语意

果断试一下

 public static void main(String[] args) throws Exception {
        Properties prop = new Properties();
        prop.put("decimal.handling.mode", "string");

        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname("...")
                .port(...)
                .database("...") // monitor XE database
                .schemaList("...") // monitor inventory schema
                .tableList("...") // monitor products table
                .username("...")
                .password("...")
                .debeziumProperties(prop)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print()
                .setParallelism(1)
        ; // use parallelism 1 for sink to keep message ordering

        env.execute();
    }

看到结果,瞬间通透了

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

 还看了一下debezium的其他参数,非常多,还需要持续的学习

结论不重要,学习的过程很重要

后续补充:

后来又遇到money类型的转换问题,设置decimal.handling.mode=string,money类型的字段还是变成base64编码的字符串

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

如果从flink-cdc的网站连接进入debezium,打开的文档是1.6版本的,而且从flink-cdc的源码工程看,最新的flink-cdc2.3使用的debezium还是1.6.4.Final版本的。

此时从debezium文档可以看到decimal.handling.mode设置只有decimal和number列,全网页搜索money没有这个关键字

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串
试着往之后的版本搜索,一直到1.8版本的,可以看到有money了

为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

然后排除flink-cdc中的debezium依赖,从新引入1.8版本依赖,decimal.handling.mode设置就对money有效了

postgresql示例:

<properties>
    <flink.cdc>2.2.1</flink.cdc>
	<debezium.version>1.8.0.Final</debezium.version>
</properties>

<dependencies>
    <dependency>
			<groupId>com.ververica</groupId>
			<artifactId>flink-connector-postgres-cdc</artifactId>
			<version>${flink.cdc}</version>
			<exclusions>
				<exclusion>
					<artifactId>debezium-core</artifactId>
					<groupId>io.debezium</groupId>
				</exclusion>
				<exclusion>
					<artifactId>debezium-connector-postgres</artifactId>
					<groupId>io.debezium</groupId>
				</exclusion>
			</exclusions>
		</dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-core</artifactId>
            <version>${debezium.version}</version>
        </dependency>
		<dependency>
			<groupId>io.debezium</groupId>
			<artifactId>debezium-connector-postgres</artifactId>
			<version>${debezium.version}</version>
		</dependency>
</dependencies>

终极解决方案:

如果工程只采集postgresql,通过排除flink-cdc依赖的 debezium1.6,引入debezium1.8可以解决money类型被转成base64格式问题。但是,如果工程中还同时集成了采集mysql,oracle等,这样做会引起其他兼容问题,所以不用排出依赖重新引入了。文章最开始的通过json解码器设置dicimal类型解码规则重新用起来,在结合decimal.handling.mode设置完美解决问题文章来源地址https://www.toymoban.com/news/detail-407662.html

Properties prop = new Properties();
prop.put("decimal.handling.mode", "string");

Map config = new HashMap();
config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(true, config);

SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname("..")
                .port(5432)
                .database("..") // monitor postgres database
                .schemaList("..")  // monitor inventory schema
                .tableList("..") // monitor products table
                .username("..")
                .password("...")
                .debeziumProperties(prop)
                .decodingPluginName("pgoutput")
                .deserializer(jdd) // converts SourceRecord to JSON String
                .build();

到了这里,关于为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(43)
  • flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(87)
  • Flinkx/Datax/Flink-CDC 优劣势对比

    Flinkx/Datax/Flink-CDC 优劣势对比_HiBoyljw的博客-CSDN博客        FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾),并维

    2024年02月07日
    浏览(50)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 为什么flink那么受欢迎?

           我们知道,Storm已经不流行了,目前几乎没有公司用。        对于大数据开发,主流的就是Hadoop Spark和Flink,一般学习顺序也都是Hadoop——spark——Flink。        现在也有很多人说Spark已经不行了,更倾向于学习和使用Flink。那是因为一些大厂例如阿里主要是使用F

    2024年01月23日
    浏览(45)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(47)
  • 为什么选择 Flink 做实时处理

    优质博文:IT-BLOG-CN 【1】流数据更真实地反映了我们的生活方式(实时聊天); 【2】传统的数据架构是基于有限数据集的(Spark 是基于微批次数据处理); 【3】我们的目标:低延迟、高吞吐(分布式架构,可能会出现顺序上的混乱,比如统计1个小时内,可能在1小时的时候

    2024年03月11日
    浏览(86)
  • 为什么 Flink 抛弃了 Scala

    曾经红遍一时的Scala 想当初Spark横空出世之后,Scala简直就是语言界的一颗璀璨新星,惹得大家纷纷侧目,连Kafka这类技术框架也选择用Scala语言进行开发重构。 可如今,Flink竟然公开宣布弃用Scala 在Flink1.18的官方文档里,有一句非常严肃的话:所有的Flink Scala APIs已被标记为废

    2024年02月04日
    浏览(50)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包