Flink之FileSink将数据写入parquet文件

这篇具有很好参考价值的文章主要介绍了Flink之FileSink将数据写入parquet文件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink之FileSink将数据写入parquet文件

在使用FileSink将数据写入列式存储文件中时必须使用forBulkFormat,列式存储文件如ORCFileParquetFile,这里就以ParquetFile为例结合代码进行说明.

在Flink1.15.3中是通过构造ParquetWriterFactory然后调用forBulkFormat方法将构造好的ParquetWriterFactory传入,这里先讲一下构造ParquetWriterFactory一共有三种方式

序列 API
方式一 AvroParquetWriters.forGenericRecord
方式二 AvroParquetWriters.forSpecificRecord
方式三 AvroParquetWriters.forReflectRecord

其中方式三AvroParquetWriters.forReflectRecord是我们常用的方法,使用起来也是复杂最低、代码变更时灵活度较好方法,方式二AvroParquetWriters.forSpecificRecord使用起来复杂度较高,但是代码变更的时候灵活度相对较好的方法,方式一AvroParquetWriters.forGenericRecord使用起来比较麻烦,而且代码变更时需要更改的也比较多,这里主要介绍方式二和方式三的使用方式.

要说明一点再Flink1.15.3中是通过AvroParquetWriters来构造ParquetWriterFactory,如果是早期版本的Flink可能是要通过ParquetAvroWriters来进行构造,当然在1.15.3中也可以通过这个方式进行构造,不过ParquetAvroWriters已经标注为过时并且建议使用AvroParquetWriters

源码内容如下:

/**
 * Convenience builder to create {@link ParquetWriterFactory} instances for the different Avro
 * types.
 *
 * @deprecated use {@link AvroParquetWriters} instead. // 看这部分是建议使用AvroParquetWriters
 */
@Deprecated // 这里已经标注了过时
public class ParquetAvroWriters {

    /**
     * Creates a ParquetWriterFactory for an Avro specific type. The Parquet writers will use the
     * schema of that specific type to build and write the columnar data.
     *
     * @param type The class of the type to write.
     */
    public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
            Class<T> type) {
        return AvroParquetWriters.forSpecificRecord(type);
    }
  • AvroParquetWriters.forReflectRecord(方式三)

    这里就先介绍一下AvroParquetWriters.forReflectRecord的使用方式,我们在使用FileSink时最好配合Checkpoint使用,不然文件只会出现inprogress状态,感兴趣的可以自己实验一下,我在Flink中FileSink的使用演示了加Checkpoint和不加Checkpoint的区别感兴趣的可以看一下.

    代码模板内容比较简单,直接代码演示:

    import com.jin.bean.User;
    import com.jin.schema.UserSchemaBean;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.ParquetWriterFactory;
    import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/6/28
     * @Description: 测试
     **/
    public class FlinkFileSinkForParquet {
        public static void main(String[] args) throws Exception {
            // 创建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(1);
    
            // 每30秒作为checkpoint的一个周期
            env.enableCheckpointing(30000);
            // 两次checkpoint间隔最少是20秒
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
            // 程序取消或者停止时不删除checkpoint
            env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // checkpoint必须在60秒结束,否则将丢弃
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            // 同一时间只能有一个checkpoint
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            // 设置EXACTLY_ONCE语义,默认就是这个
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            // checkpoint存储位置
            env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
            
            // 添加数据源(这里使用的是自定义数据源CustomizeSource,方便测试)
            DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource());
            // 将数据流中的数据存储到bean对象中
            SingleOutputStreamOperator<User> userMapStream = sourceStream.map(bean -> new User(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit()));
            // 构建parquetWriterFactory
            ParquetWriterFactory<User> parquetWriterFactory2 = AvroParquetWriters.forReflectRecord(User.class);
            // 构建FileSink
            FileSink<User> parquetFileSink = FileSink
                    // 使用Bulk模式,并配置路径和对应的schema
                    .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory2)
                    // 分桶策略,使用默认的
                    .withBucketAssigner(new DateTimeBucketAssigner<User>())
                    // 每100毫秒检查一次分桶
                    .withBucketCheckInterval(100)
                    // 滚动策略,Bulk的滚动策略只有一种,就是发生Checkpoint的时候才进行滚动(为了保证列式文件的完整性)
                    .withRollingPolicy(OnCheckpointRollingPolicy.build())
                    .build();
            // 输出到文件
            userMapStream.sinkTo(parquetFileSink);
            env.execute();
    
        }
    }
    
    @Getter
    @Setter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    class User {
        private String name;
        private int age;
        private String gender;
        private String hobbit;
    }
    

    代码中注释很详细了,具体使用看注释即可。这里说明一下为什么forBulkFormat的滚动策略只有OnCheckpointRollingPolicy而不是像forRowFormat那样可以通过时间和文件大小来控制文件滚动,注释中我也讲了是为了保证列式存储文件的完整性,因为列式文件中记录了很多信息,并不想行式存储文件一行一行的写就行,写到某一行直接停了也不影响文件的使用,而列式存储文件中不单单是记录了数据本身还有对应的字段类型、文件头信息、文件尾信息、切片索引等很多信息,如果在写入数据时某一刻直接停止了,而文件还没有生成完整的信息那就会导致这个列士存储文件根本不具备使用性,是无法进行解析的。

    就比如说ParquetFile,它的文件结构如下图
    flink写parquet文件,FLink,flink,大数据,java

    可以看到文件的结构信息是很复杂的,如果感兴了解一下可以看数据存储格式这篇文章了解一下,这里就不细说了,内容还是比较多的.

  • AvroParquetWriters.forSpecificRecord(方式二)

    forSpecificRecord的使用不像forReflectRecord那样自定义一个bean接收数据就行了,使用forSpecificRecord还要结合一下Apache avro的官网看一下,下面我就介绍一下如何使用forSpecificRecord.

    avro的使用有两种方式一是通过API直接调用的方式,二通过配置avsc文件然后进行编译的方式,在代码中我们使用的第二种方式,使用第一种方式同样会出现很多schema的信息在代码中写死修改起来会比较复杂的问题,而且对avroAPI也要足够熟悉,学习成本还是有的.

    1. resource目录中创建avsc文件,文件内容如下

      {
        "namespace": "com.jin.schema",
        "type": "record",
        "name": "UserSchemaBean",
        "fields": [
          {"name": "name", "type": "string"},
          {"name": "age", "type": "int"},
          {"name": "gender",  "type": "string"},
          {"name": "hobbit", "type": "string"}
        ]
      }
      

      文件中的内容就是schema信息,这里我相信大家都能看得明白."namespace": "com.jin.schema"编译后自动创建的bean的存储位置,"name": "UserSchemaBean"就是配置生成bean的名称,fields中就是配置生成bean的成员变量和对应的数据类型.

      官网演示的avsc文件内容如下:

      {"namespace": "example.avro",
       "type": "record",
       "name": "User",
       "fields": [
           {"name": "name", "type": "string"},
           {"name": "favorite_number",  "type": ["int", "null"]},
           {"name": "favorite_color", "type": ["string", "null"]}
       ]
      }
      

      编译后就会根据avsc文件中的schema信息在配置好的目录中自动创建bean.

    2. Maven中添加avsc文件编译插件

      官网内容如下:

      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.11.1</version>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
      

      要注意<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>是已经配置完的avsc文件的位置,像是我就是在原有的resource目录下配置的就要将内容改成<sourceDirectory>${project.basedir}/src/main/resource/</sourceDirectory>否则在编译时就会报错找不到对应的目录或文件,如果想直接使用<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>那就在项目的main目录下创建一个avro目录并将目录性质改为Source root(这个如果不会可自行百度,关键字我都已经提供了).

      我的项目中实际配置如下:

                  <!-- avro插件 -->
                  <plugin>
                      <groupId>org.apache.avro</groupId>
                      <artifactId>avro-maven-plugin</artifactId>
                      <version>1.10.0</version>
                      <executions>
                          <execution>
                              <phase>generate-sources</phase>
                              <goals>
                                  <goal>schema</goal>
                              </goals>
                              <configuration>
                                  <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                                  <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
      

      选择插件的版本时要注意依赖冲突问题,我们要先看一下Flink的flink-avro下的org.apache.avro:avro是什么版本,如下图:
      flink写parquet文件,FLink,flink,大数据,java

      可以看到1.15.3org.apache.avro:avro的版本是1.10.0,所以我选择的插件也是这个版本.

    3. 编译

      上面步骤都完成了就可以进行编译了,Maven->Lifecycle->compile,这里看一下编译后的结果如下图:
      flink写parquet文件,FLink,flink,大数据,java

      可以看到已经根据我们配置的avsc文件自动创建了对应的bean,这里看一下成员变量内容是否一致,如下:

        /**
         * All-args constructor.
         * @param name The new value for name
         * @param age The new value for age
         * @param gender The new value for gender
         * @param hobbit The new value for hobbit
         */
        public UserSchemaBean(java.lang.CharSequence name, java.lang.Integer age, java.lang.CharSequence gender, java.lang.CharSequence hobbit) {
          this.name = name;
          this.age = age;
          this.gender = gender;
          this.hobbit = hobbit;
        }
      

      可以看到成员变量信息也是完全一致,我这里值展示了小部分代码,编译后的bean中的代码信息很多,不过我们不用关心这个,懂与不懂都不影响使用.

    4. 代码内容

      接下来就到主题了,实际的代码内容如下:

      import com.jin.schema.UserSchemaBean;
      import org.apache.flink.connector.file.sink.FileSink;
      import org.apache.flink.core.fs.Path;
      import org.apache.flink.formats.parquet.ParquetWriterFactory;
      import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
      import org.apache.flink.streaming.api.CheckpointingMode;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.CheckpointConfig;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
      import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
      
      /**
       * @Author: J
       * @Version: 1.0
       * @CreateTime: 2023/6/28
       * @Description: 测试
       **/
      public class FlinkFileSinkForParquet {
          public static void main(String[] args) throws Exception {
              // 创建流环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // 设置并行度
              env.setParallelism(1);
      
              // 每30秒作为checkpoint的一个周期
              env.enableCheckpointing(30000);
              // 两次checkpoint间隔最少是20秒
              env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
              // 程序取消或者停止时不删除checkpoint
              env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
              // checkpoint必须在60秒结束,否则将丢弃
              env.getCheckpointConfig().setCheckpointTimeout(60000);
              // 同一时间只能有一个checkpoint
              env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
              // 设置EXACTLY_ONCE语义,默认就是这个
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
              // checkpoint存储位置
              env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
      
              // 添加数据源(这里使用的是自定义数据源,方便测试)
              DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource());
              // 将数据流中的对象转成UserSchemaBean类型
              SingleOutputStreamOperator<UserSchemaBean> mapStream = sourceStream.map(bean -> new UserSchemaBean(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit()));
              // 构建parquetWriterFactory,这里传入的就是编译后的UserSchemaBean
              ParquetWriterFactory<UserSchemaBean> parquetWriterFactory = AvroParquetWriters.forSpecificRecord(UserSchemaBean.class);
              // 构建FileSink
              FileSink<UserSchemaBean> parquetFileSink = FileSink
                      // 使用Bulk模式,并配置路径和对应的schema
                      .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory)
                      // 分桶策略,使用默认的
                      .withBucketAssigner(new DateTimeBucketAssigner<UserSchemaBean>())
                      // 每100毫秒检查一次分桶
                      .withBucketCheckInterval(100)
                      // 滚动策略,Bulk的滚动策略只有一种,就是发生Checkpoint的时候才进行滚动(为了保证列式文件的完整性)
                      .withRollingPolicy(OnCheckpointRollingPolicy.build())
                      .build();
              // 输出到文件
              mapStream.sinkTo(parquetFileSink);
              env.execute();
      
          }
      }
      

      通过代码我们可以看到,内容基本就是一致的无非就是forSpecificRecord传入的bean不同而已,当然还是建议使用AvroParquetWriters.forReflectRecord这种方式,简易高效,复杂的过程并不一定能提高我们的代码能力.

      到这里这两种方式我都介绍完了,希望看完这篇文章有所收获.文章来源地址https://www.toymoban.com/news/detail-770193.html

到了这里,关于Flink之FileSink将数据写入parquet文件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 6.2、Flink数据写入到Kafka

    目录 1、添加POM依赖 2、API使用说明 3、序列化器 3.1 使用预定义的序列化器 3.2 使用自定义的序列化器 4、容错保证级别 4.1 至少一次 的配置 4.2 精确一次 的配置 5、这是一个完整的入门案例 Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    2024年02月09日
    浏览(48)
  • Flink将数据写入MySQL(JDBC)

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。 2.1 版本说明 2.2 导入相关依赖 2.3 连接数据库,创建表 2.4 创建POJO类 2.5 自定义map函数 2.5 Flink2MySQL 2.6 启动necat、Flink,观察数据库写

    2024年02月07日
    浏览(44)
  • 【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

    需求描述: 1、数据从 Kafka 写入 Hive。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。 5、先在 Hive 中创建表然后动态获取 Hive 的表

    2024年02月03日
    浏览(57)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

    需求描述: 1、数据从 Kafka 写入 ClickHouse。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。 5、Kafka 数据为 Json 格式,通过 FlatMap 扁平

    2024年02月03日
    浏览(47)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(44)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • 15_基于Flink将pulsar数据写入到ClickHouse

    编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。 结论: ClickHouse像很多OL

    2024年02月14日
    浏览(82)
  • 14_基于Flink将pulsar数据写入到HBase

    3.7.1.编写Flink完成数据写入到Hbase操作, 完成数据备份, 便于后续进行即席查询和离线分析 3.7.1.1.HBase基本介绍 hbase是基于Google发布bigTable论文产生一款软件, 是一款noSQL型数据, 不支持SQL. 不支持join的操作, 没有表关系, 不支持事务(多行事务),hbase是基于 HDFS的采用java 语言编写 查

    2024年02月13日
    浏览(35)
  • 【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1.with参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置 true 开启流读模式

    2024年02月14日
    浏览(46)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包