Flink 数据类型 & TypeInformation信息

这篇具有很好参考价值的文章主要介绍了Flink 数据类型 & TypeInformation信息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我么需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。并为每个数据类型生成特定的序列化器、反序列化器和比较器。Flink支持非常完善的数据类型,数据类型描述信息都是由TypeInformation定义,比较常用的TypeInformationBasicTypeInfoTupleTypeInfoCaseClassTypeInfoPojoTypeInfo类等。TypeInformation主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的JavaScala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或String类型,例如IntegerStringDouble等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

//创建 Int 类型的数据集
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5);
//创建 String 的类型的数据集
DataStreamSource<String> stringDataStreamSource = env.fromElements("Java", "Scala");

Flink实现另外一种TypeInfomationBasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或String对象的数组,如下代码通过使用 Array数组和List集合创建DataStream数据集。

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
//通过 List 集合创建数据集
DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);

Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,FlinkJava接口中定义了元祖类Tuple供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

//通过实例化 Tuple2 创建具有两个元素的数据集
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
//通过实例化 Tuple3 创建具有三个元素的数据集
DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("a", 1, 3L), new Tuple3<>("b", 2, 3L));

Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据 word字段重新分区。

//定义 WordCount Case Class 数据结构
case class WordCount(word: Sring, count: Int)
//通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1),WordCount("word",2))
val keyStream1 = input.keyBy("word")//根据word字段为分区字段,
val keyStream2 = input.keyBy(0)//也可以通过制定position分区

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用 Tuple中的默认字段名称。

//通过实例化Scala Tuple2 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String,Int]] = env.fromElements(("a",1),("b",2));
//使用默认名字段获取字段,表示第一个 tuple字段,相当于下标0
tuple2DataStreamSource.keyBy("_1");

POJOs 类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括JavaScala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:
【1】POJOs类必须是Public修饰且必须独立定义,不能是内部类;
【2】POJOs类中必须含有默认空构造器;
【3】POJOs类中所有的 Fields必须是Public或者具有Public修饰的gettersetter方法;
【4】POJOs类中的字段类型必须是Flink支持的。

//类和属性具有 public 修饰
public class Persion{
    public String name;
    public Integer age;
    //具有默认的空构造器
    public Persion(){}
    public Persion(String name,Integer age){
        this.name = name;
        this.age = age;
    };
}

定义好POJOs Class后,就可以在 Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。

DataStreamSource<Persion> persionDataStreamSource = env.fromElements(new Persion("zzx", 18), new Persion("fj", 16));
persionData.keyBy("name").sum("age");

Flink Value类型

Value数据类型实现了org.apache.flink.types.Value,其中包括read()write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

特殊数据类型

Flink中也支持一些比较特殊的数据数据类型,例如Scala中的ListMapEitherOptionTry数据类型,以及Java中Either数据类型,还有HadoopWritable数据类型。如下代码所示,创建MapList类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。

//创建 map 类型数据集
Map map = new HashMap<>();
map.put("name","zzx");
map.put("age",12);
env.fromElements(map);
//创建 List 类型数据集
env.fromElements(Arrays.asList(1,2,3,4,5),Arrays.asList(3,4,5));

TypeInformation信息获取: 通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala APIJava API中,Flink分别使用了不同的方式重构了数据类型信息。

Scala API类型信息

Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。
当使用Scala API开发 Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报could not find implicit value for evidence parameter of type TypeInformation的错误。这时需要将TypeInformation类隐式参数引入到当前程序环境中,代码实例如下:

import org.apache.flink.api.scala._

Java API类型信息

由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示Ctype Himts来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单通过在returns方法中传入TypeHint实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。

//定义泛型函数,输入参数 T,O 输出参数为 O
class MyMapFucntion<T,O> implements MapFunction<T,O>{
    @Override
    public O map(T t) throws Exception {
        //定义计算逻辑
        return null;
    }
}

//通过 List 集合创建数据集
DataStreamSource<Integer> input = env.fromCollection(integers);
input.flatMap(new MyMapFucntion<String,Integer>()).returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
})

在使用Java API定义POJOs类型数据时,PojoTypeInformationPOJOs类中的所有字段创建序列化器,对于标准的类型,例如IntegerStringLong等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用AvroPOJOs类进行序列化,如下代码通过在ExecutionConfig中调用 enableForceAvro()来开启Avro序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 avro 序列化
env.getConfig().enableForceAvro();

如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 Kryo 序列化
env.getConfig().enableForceKryo();

如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfigaddDefaultKryoSerializer()方法向Kryo中添加自定义的序列化器。

public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

自定义TypeInformation

除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的 Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。

@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0,T1>{
    public T0 field0;
    public T1 field1;
}

然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation文章来源地址https://www.toymoban.com/news/detail-762527.html

public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple>{
    @Override
    public TypeInfomation<CustomTuple> createTypeInfo(Type t, Map<String,TypeInfoFactory<?>> genericParameters){
        return new CustomTupleTypeInfo(genericParameters.get("T0"),genericParameters.get("T1");
    }
}

到了这里,关于Flink 数据类型 & TypeInformation信息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink番外篇】19、Datastream数据类型到Table schema映射示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月21日
    浏览(51)
  • 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

    Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。 社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何

    2024年02月07日
    浏览(56)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(51)
  • 【大数据面试题】008 谈一谈 Flink Slot 与 并行度

    一步一个脚印,一天一道面试题 该文章有较多引用文章 https://zhuanlan.zhihu.com/p/572170629?utm_id=0 并行度 Parallelism 概念作用 并行度是作用于 算子 的单位。Flink 的每个算子都可以单独设置并行度。一般来说,并行度越大,处理能力越大,处理的就越快。 Slot 概念作用 Slot 是 Flink

    2024年02月19日
    浏览(39)
  • 【Flink】FlinkCDC获取mysql数据时间类型差8小时时区解决方案

    1、背景: 在我们使用FlinkCDC采集mysql数据的时候,日期类型是我们很常见的类型,但是FlinkCDC读取出来会和数据库的日期时间不一致,情况如下 FlinkCDC获取的数据中create_time字段1694597238000转换为时间戳2023-09-13 17:27:18  而数据库中原始数据如下,并没有到下午5点,这就导致了

    2024年02月07日
    浏览(51)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(43)
  • 大数据_面试_ETL组件常见问题_spark&flink

    问题列表 回答 spark与flink的主要区别 flink cdc如何确保幂等与一致性 Flink SQL CDC 实践以及一致性分析-阿里云开发者社区 spark 3.0 AQE动态优化 hbase memorystore blockcache sparksql如何调优 通过webui定位那个表以及jobid,jobid找对应的执行计划 hdfs的常见的压缩算法 hbase的数据倾斜 spark数据处

    2024年02月16日
    浏览(44)
  • Flink读取mysql数据库(java)

    代码如下: 运行结果如下:

    2024年02月12日
    浏览(41)
  • 24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月06日
    浏览(55)
  • Flink复习3-2-4-6-1(v1.17.0): 应用开发 - DataStream API - 状态和容错 - 数据类型&序列化 - 概述

    Apache Flink handles data types and serialization in a unique way, containing its own type descriptors, generic type extraction, and type serialization framework. This document describes the concepts and the rationale behind them. Apache Flink以独特的方式处理数据类型和序列化,包含自己的类型描述符、泛型类型提取和类型序列化

    2024年02月12日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包