Flink将数据写入CSV文件后文件中没有数据

这篇具有很好参考价值的文章主要介绍了Flink将数据写入CSV文件后文件中没有数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink中有一个过时的sink方法:writeAsCsv,这个方法是将数据写入CSV文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因.

这里先看一下数据处理的代码
代码中我是使用的自定义数据源生产数据的方式,为了方便测试

import lombok.*;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/19
 * @Description: 自定义数据源测试
 **/
public class FlinkCustomizeSource {
    public static void main(String[] args) throws Exception {
        // 创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1); // 这里的并行度设置为几就会生成多少个csv文件
        // 添加自定义数据源
         DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new customizeSource());
        // 先将数据转换成Tuple类型,这样才能写入csv中
        SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> tuple4Stream = dataStreamSource.map(
                bean -> Tuple4.of(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit())
        ).returns(new TypeHint<Tuple4<String, Integer, String, String>>() {});
        // 选择csv类型的sink,模式使用的覆盖
        tuple4Stream.writeAsCsv("/Users/xxx/data/testData/test.csv", FileSystem.WriteMode.OVERWRITE);
        env.execute();
    }
}

// 自定义数据源需要实现SourceFunction接口,注意这个接口是单机的数据源,如果是想自定义分布式的数据源需要集成RichParallelSourceFunction类
class customizeSource implements SourceFunction<CustomizeBean> {
    int flag;
    // Job执行的线程
    @Override
    public void run(SourceContext ctx) throws Exception {
        /*这个方法里就是具体的数据逻辑,实际内容要根据业务需求编写,这里只是为了演示方便*/
        CustomizeBean customizeBean = new CustomizeBean();
        String[] genders = {"M", "W"};
        String[] hobbits = {"篮球运动爱好者", "钓鱼爱好者", "乒乓球运动爱好者", "美食爱好者", "羽毛球运动爱好者", "天文知识爱好者", "旅游爱好者", "书法爱好者", "非遗文化爱好者", "网吧战神"};
        while (flag != 100) {
            // 这里自定义的Bean作为数据源
            customizeBean.setAge(RandomUtils.nextInt(18, 80)); // 年龄
            customizeBean.setName("A-" + new Random().nextInt()); // 姓名
            customizeBean.setGender(genders[RandomUtils.nextInt(0, genders.length)]); // 性别
            customizeBean.setHobbit(hobbits[RandomUtils.nextInt(0, hobbits.length)]); // 爱好
            // 将数据收集
            ctx.collect(customizeBean);
            // 睡眠时间是为了控制数据生产的速度,演示效果更加明显
            Thread.sleep(1000);
        }
    }

    // Job取消时就会调用cancel方法
    @Override
    public void cancel() {
        // flag为100时就会停止程序
        flag = 100;
    }
}

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
class CustomizeBean{
    private String name;
    private int age;
    private String gender;
    private String hobbit;
}

上面的代码中我们使用自定义数据源的方式(java bean[CustomizeBean]),通过设置Thread.sleep(1000)可以固定每秒生成一条数据.这里我们先看一下存储CSV文件的目录
flink-csv,FLink,flink,java,大数据
通过上图可以看到程序没有启动时,目录是空的,这里我们启动一下程序
日志内容如下

[2023-06-19 15:26:37,755]-[INFO] -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader -3206 -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader.load(StateChangelogStorageLoader.java:98).load(98) | Creating a changelog storage with name 'memory'.
[2023-06-19 15:26:37,766]-[INFO] -org.apache.flink.runtime.taskexecutor.TaskExecutor -3217 -org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:757).submitTask(757) | Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203), deploy into slot with allocation id b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,768]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3219 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from CREATED to DEPLOYING.
[2023-06-19 15:26:37,769]-[INFO] -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl -3220 -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.markExistingSlotActive(TaskSlotTableImpl.java:388).markExistingSlotActive(388) | Activate slot b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,773]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3224 -org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:623).doRun(623) | Loading JAR files for task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) [DEPLOYING].
[2023-06-19 15:26:37,788]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3239 -org.apache.flink.runtime.state.StateBackendLoader.loadFromApplicationOrConfigOrDefaultInternal(StateBackendLoader.java:257).loadFromApplicationOrConfigOrDefaultInternal(257) | No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4e1fcd2f
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.runtime.state.StateBackendLoader -3240 -org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:315).fromApplicationOrConfigOrDefault(315) | State backend loader loads the state backend as HashMapStateBackend
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3240 -org.apache.flink.runtime.state.CheckpointStorageLoader.createJobManagerCheckpointStorage(CheckpointStorageLoader.java:274).createJobManagerCheckpointStorage(274) | Checkpoint storage is set to 'jobmanager'
[2023-06-19 15:26:37,793]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3244 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,795]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3246 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,836]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3287 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.
[2023-06-19 15:26:37,837]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3288 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.

这里的日志我截取了最后的部分,可以看到没有任何报错的,我们在看一下生成的CSV文件
flink-csv,FLink,flink,java,大数据
这里我们再将文件打开,看一下有没有数据
flink-csv,FLink,flink,java,大数据
通过图片可以看到这个文件中是没有任何数据的.
这里我先说一下原因,然后再结合源码看一下,没有数据的原因是数据在内存中还没有达到4k的缓存,没有到这个数据量就不会将数据刷新到磁盘上,代码中我们加入了睡眠时间Thread.sleep(1000)就是为了看到这个效果,接下来我们就结合源码看一下.writeAsCsv这个方法的缓存刷新是不是4k,我们先看一下.writeAsCsv的内容,点击去源码后我们先找到下面这段代码

    @Deprecated
    @PublicEvolving
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(
            String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) {
        Preconditions.checkArgument(
                getType().isTupleType(),
                "The writeAsCsv() method can only be used on data streams of tuples.");

        CsvOutputFormat<X> of = new CsvOutputFormat<>(new Path(path), rowDelimiter, fieldDelimiter);// 着重看这里,我们在看一下CsvOutputFormat里面的内容

        if (writeMode != null) {
            of.setWriteMode(writeMode);
        }

        return writeUsingOutputFormat((OutputFormat<T>) of);
    }

这里我们在点击去看CsvOutputFormat这个输出,找到如下内容

 @Override
    public void writeRecord(T element) throws IOException {
        int numFields = element.getArity();

        for (int i = 0; i < numFields; i++) {
            Object v = element.getField(i);
            if (v != null) {
                if (i != 0) {
                    this.wrt.write(this.fieldDelimiter);
                }

                if (quoteStrings) {
                    if (v instanceof String || v instanceof StringValue) {
                        this.wrt.write('"'); // 我们要注意到wrt这个变量
                        this.wrt.write(v.toString());
                        this.wrt.write('"');
                    } else {
                        this.wrt.write(v.toString());
                    }
                } else {
                    this.wrt.write(v.toString());
                }
            } else {
                if (this.allowNullValues) {
                    if (i != 0) {
                        this.wrt.write(this.fieldDelimiter);
                    }
                } else {
                    throw new RuntimeException(
                            "Cannot write tuple with <null> value at position: " + i);
                }
            }
        }

        // add the record delimiter
        this.wrt.write(this.recordDelimiter);
    }

这里我们先看一下writeRecord(T element)这个方法,实际上在我们调用writeAsCsv的时候底层就是通过writeRecord方法将数据写入csv文件,我们看上面代码的时候要注意到this.wrt这个变量,通过wrt我们就可以找到,对数据刷新到磁盘定义的数据量的大小,看一下对wrt的定义,源码内容如下

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        super.open(taskNumber, numTasks);
        this.wrt =
                this.charsetName == null
                        ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) // 看一下这里
                        : new OutputStreamWriter(
                                new BufferedOutputStream(this.stream, 4096), this.charsetName); // 还有这里
    }

通过上面的源码我们可以看到BufferedOutputStream的缓冲流定义死了为4096,也就是4k大小,这个参数是写死的,我们改变不了,所以在使用writeAsCsv这个方法时,代码没有报错,并且文件中也没有数据时先不要慌,通过源码先看看具体的实现逻辑,我们就可以很快定位到问题,如果代码中我将Thread.sleep(1000)这行代码删除掉的话CSV文件中很快就会有数据的,代码中我使用的自定义数据源,并且每条数据其实很小,还有睡眠1秒的限制,所以导致很久CSV文件中都没有数据生成.
文章内容写到现在也过了很久了,数据的大小也满足4k的条件了,我们看一下文件内容
flink-csv,FLink,flink,java,大数据
可以看到文件中已经生成了数据,我们在看一下文件的大小
flink-csv,FLink,flink,java,大数据
说到这里我想大家应该都理解了,虽然说了这么多关于writeAsCsv这个方法的内容,但是不建议大家使用这个方法毕竟属于过时的方法,用起来弊端也比较大.文章来源地址https://www.toymoban.com/news/detail-566014.html

到了这里,关于Flink将数据写入CSV文件后文件中没有数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python操作写入/读取csv文件

    网络工程师Python数据存储(第1节,CSV文件) 网络自动化运维演进的一个方向大致过程:网络工程师从关注配置制作脚本,完成后上设备刷配置,慢慢地演化为网络工程师关注和确定设备配置的某些重要控制参数,而把制作脚本任务交给Jinja2等去渲染生成,把下发脚本工作交

    2024年02月03日
    浏览(65)
  • 【已解决】MATLAB写入csv文件

    在使用MATLAB的时候,经常需要将数据以csv格式保存。接下来就看看如何将MATLAB中的数据保存到csv文件中 首先来看看csv格式。csv格式是用逗号分隔数据的一种文件。一行之间的数据用逗号分隔,行与行之间用n分隔。 用MATLAB将数据写入csv文件时,首先用fopen创建一个有写入权限

    2024年02月11日
    浏览(55)
  • Python——csv文件的写入与读取

    CSV文件是一种常见的数据格式,它以逗号分隔不同的字段,每行表示一个数据记录。在Python中,我们可以使用csv模块来读取和写入CSV文件。 在Python中,我们可以使用csv模块的writer对象来写入CSV文件。下面是一个例子: 在上面的例子中,我们首先创建了要写入的数据,它是一

    2024年02月06日
    浏览(63)
  • python读取txt文件内容,写入csv文件中去。

    txt文件中的内容大概是这样的: 2.在图3中,当开关断开时,R1、R2_______(串联/并联),当开关闭合时, 被短路。开关由断开转为闭合时,总电阻 ,总电流_______,通过R2的电流_______(变大/变小/不变)。 3.如图3,当开关闭合时,R2两端电压为3V,若R2=10Ω,则电流为_______。断开

    2023年04月08日
    浏览(70)
  • 【Python基础】一文搞懂:Python 中 csv 文件的写入与读取

    在数据处理和数据分析领域,CSV (逗号分隔值) 文件是一种常见的文件格式,用于存储表格数据。Python 通过内置的 csv 模块提供了对 CSV 文件的读写支持,使得处理这种类型的文件变得简单高效。本文将详细介绍如何在 Python 中进行 CSV 文件的读取和写入操作。 CSV 文件是一种简

    2024年04月25日
    浏览(46)
  • Flink之FileSink将数据写入parquet文件

    在使用FileSink将数据写入列式存储文件中时必须使用 forBulkFormat ,列式存储文件如 ORCFile 、 ParquetFile ,这里就以 ParquetFile 为例结合代码进行说明. 在Flink 1.15.3 中是通过构造 ParquetWriterFactory 然后调用 forBulkFormat 方法将构造好的 ParquetWriterFactory 传入,这里先讲一下构造 ParquetWriterF

    2024年02月03日
    浏览(45)
  • flink:通过table api把文件中读取的数据写入MySQL

    当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 文件info.txt

    2024年03月15日
    浏览(50)
  • python如何写入csv

    在使用python对文件操作的过程中,你肯定碰到过对csv文件的操作,下面就python对csv文件的操作进行详述。 CSV(Comma-Separated Values)逗号分隔符,也就是每条记录中的值与值之间是用分号分隔的。 打开CSV文件并写入一行数据 这里的操作是实现csv文件的打开以及写入一行数据,首

    2024年04月14日
    浏览(54)
  • Python写入CSV出现空行解决方法

    最近在用Python创建写入csv文件,也就在无形中踩到一些坑,也因此记录下来,作为纠错,也希望帮到大家。 前提:使用csv存储多维数组元素,发现写入后,使用Excel打开该csv文件会出现空行,使用文件方式读取该csv文件输出会出现“n\\\"。 解决方法:在csv文件生成时,添加参数

    2024年02月12日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包