Flink中FileSink的使用

这篇具有很好参考价值的文章主要介绍了Flink中FileSink的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在Flink中提供了StreamingFileSink用以将数据流输出到文件系统.
这里结合代码介绍如何使用FileSink.
首先FileSink有两种模式forRowFormatforBulkFormat

    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
            final Path basePath, final Encoder<IN> encoder) {
        return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
    }

    public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
            final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory) {
        return new DefaultBulkFormatBuilder<>(
                basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
    }

二者的区别是forRowFormat是一行一行的处理数据,而forBulkFormat则是可以一次处理多条数据,而多条处理的好处就是可以帮助生成列式存储的文件如ParquetFileORCFile,而forRowFormat则做不到这点,关于列式存储和行式存储的区别可通过数据存储格式这篇文章简单做一个了解.

下面以forRowFormat作为示例演示一下代码

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/27
 * @Description: 测试
 **/
public class FlinkFileSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)
        DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new CustomizeSource());
        // 现将数据转换成字符串形式
        SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());
        // 构造FileSink对象,这里使用的RowFormat,即行处理类型的
        FileSink<String> fileSink = FileSink
                // 配置文件输出路径及编码格式
                .forRowFormat(new Path("/Users/xxx/data/testData/"), new SimpleStringEncoder<String>("UTF-8"))
                // 设置文件滚动策略(文件切换)
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofSeconds(180)) // 设置间隔时长180秒进行文件切换
                                .withInactivityInterval(Duration.ofSeconds(20)) // 文件20秒没有数据写入进行文件切换
                                .withMaxPartSize(MemorySize.ofMebiBytes(1)) // 设置文件大小1MB进行文件切换
                                .build()
                )
                // 分桶策略(划分子文件夹)
                .withBucketAssigner(new DateTimeBucketAssigner<String>()) // 按照yyyy-mm-dd--h进行分桶
                //设置分桶检查时间间隔为100毫秒
                .withBucketCheckInterval(100)
                // 输出文件文件名相关配置
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("test_") // 文件前缀
                                .withPartSuffix(".txt") // 文件后缀
                                .build()
                )
                .build();
        // 输出到文件
        map.print();
        map.sinkTo(fileSink);
        env.execute();
    }
}

代码内容这里就不详细说明了,注释已经写得很清楚了.有一点要注意使用FileSink的时候我们要加上对应的pom依赖.我这里使用Flink版本是1.15.3

        <!-- File connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

这里我们先看一下生成的结果文件

-rw-r--r--  1 xxx  staff   1.0M  6 27 14:43 .test_-eb905337-488d-46f1-8177-86fbb46f778f-0.txt.inprogress.91e49c89-cc79-44f5-940d-ded2770b61a1
-rw-r--r--  1 xxx  staff   1.0M  6 27 14:44 .test_-eb905337-488d-46f1-8177-86fbb46f778f-1.txt.inprogress.c548bd30-8583-48d5-91d2-2e11a7dca2cd
-rw-r--r--  1 xxx  staff   1.0M  6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-2.txt.inprogress.a041dba1-8f37-4307-82da-682c48b0796b
-rw-r--r--  1 xxx  staff   280K  6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-3.txt.inprogress.e05d1759-0a38-4a25-bcd0-1216ce6dda59

这里有必要说明一下由于我使用的是Mac在生成文件的时候会出现一个小问题,上面的那种文件会隐藏起来,直接点开文件夹是看不到的可以通过command + shift + .来显示隐藏文件,或者像我这种直接通过终端ll -a来查看,windows没有发现这个问题.
可以看到除了最后一个文件,其他的文件大小基本都是1MB,最后一个是因为写入的数据大小还没有满足1MB,并且写入时间也没有满足滚动条件,所以还在持续写入中.
而且通过文件名我们可以看到所有文件中都带有inprogress这个状态,这是因为我们没有开启checkpoint,这里先说一下FileSink写入文件时的三个文件状态,官网原图如下:
forbulkformat,FLink,flink,大数据
这三种状态分别是inprogresspendingfinished,对应的就是处理中、挂起和完成,官网中同时也说明了FileSink必须和checkpoint配合使用,不然文件的状态只会出现inprogresspending,原文内容如下:
forbulkformat,FLink,flink,大数据
下面我们在看一下加入checkpoint的代码和结果文件
代码如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/27
 * @Description: 测试
 **/
public class FlinkFileSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)
        DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new CustomizeSource());
        // 现将数据转换成字符串形式
        SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());

        // 每20秒作为checkpoint的一个周期
        env.enableCheckpointing(20000);
        // 两次checkpoint间隔最少是10秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
        // 程序取消或者停止时不删除checkpoint
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // checkpoint必须在60秒结束,否则将丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只能有一个checkpoint
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置EXACTLY_ONCE语义,默认就是这个
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoint存储位置
        env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
        // 设置执行模型为Streaming方式
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        
        // 构造FileSink对象,这里使用的RowFormat,即行处理类型的
        FileSink<String> fileSink = FileSink
                // 配置文件输出路径及编码格式
                .forRowFormat(new Path("/Users/xxx/data/testData/"), new SimpleStringEncoder<String>("UTF-8"))
                // 设置文件滚动策略(文件切换)
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofSeconds(180)) // 设置间隔时长180秒进行文件切换
                                .withInactivityInterval(Duration.ofSeconds(20)) // 文件20秒没有数据写入进行文件切换
                                .withMaxPartSize(MemorySize.ofMebiBytes(1)) // 设置文件大小1MB进行文件切换
                                .build()
                )
                // 分桶策略(划分子文件夹)
                .withBucketAssigner(new DateTimeBucketAssigner<String>()) // 按照yyyy-mm-dd--h进行分桶
                //设置分桶检查时间间隔为100毫秒
                .withBucketCheckInterval(100)
                // 输出文件文件名相关配置
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("test_") // 文件前缀
                                .withPartSuffix(".txt") // 文件后缀
                                .build()
                )
                .build();
        // 输出到文件
        map.print();
        map.sinkTo(fileSink);
        env.execute();
    }
}

看一下结果文件:

-rw-r--r--  1 xxx  staff   761K  6 27 15:13 .test_-96ccd42e-716d-4ee0-835e-342618914e7d-2.txt.inprogress.aa5fccaa-f99f-4059-93e7-6d3c548a66b3
-rw-r--r--  1 xxx  staff   1.0M  6 27 15:11 test_-96ccd42e-716d-4ee0-835e-342618914e7d-0.txt
-rw-r--r--  1 xxx  staff   1.0M  6 27 15:12 test_-96ccd42e-716d-4ee0-835e-342618914e7d-1.txt

可以看到已经完成的文件状态中已经没有inprogress和其他的后缀了,而正在写入的文件则是处于inprogress状态.文章来源地址https://www.toymoban.com/news/detail-698420.html

到了这里,关于Flink中FileSink的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Flink完成流数据统计

    所有流计算统计的流程都是: 1、接入数据源 2、进行多次数据转换操作(过滤、拆分、聚合计算等) 3、计算结果的存储 其中数据源可以是多个、数据转换的节点处理完数据可以发送到一个和多个下一个节点继续处理数据 Flink程序构建的基本单元是stream和transformation(DataSet实质

    2024年02月05日
    浏览(40)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(73)
  • 使用Flink处理Kafka中的数据

    目录         使用Flink处理Kafka中的数据 前提:  一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据           具体代码(scala)                 具体执行代码①                 重要逻

    2024年01月23日
    浏览(51)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(50)
  • 【大数据】Flink CDC 的概览和使用

    CDC ( Change Data Capture , 数据变更抓取 )是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

    2024年01月24日
    浏览(49)
  • 【flink】Checkpoint expired before completing. 使用flink同步数据出现错误Checkpoint expired before completing.

    任务超时了: 重新把任务配置参数,配置如下: 或者修改 flink的 配置文件flink-conf.yaml 

    2024年02月12日
    浏览(46)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)
  • 使用flink sqlserver cdc 同步数据到StarRocks

    前沿: flink cdc功能越发强大,支持的数据源也越多,本篇介绍使用flink cdc实现: sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整个流程 1.sqlserver 环境准备(得使用sqlserver 16以下版本,flink cdc当前只支持16以下sqlserver版本) 我这个使用的是docker环

    2024年02月10日
    浏览(52)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(78)
  • 使用flink实现《实时数据分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时数据分析的案例。该案例使用Flink的流处理功能,从Kafka主题中读取数据,进行实时处理和分析,并将结果输出到Elasticsearch中。 Java 8 Flink 1.13.2 Kafka 2.8.0 Elasticsearch 7.13.4 本案例使用Kafka作为数据源,从一个名为 user_behavior 的主题中读取数据。

    2024年02月08日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包