4.2、Flink任务怎样读取文件中的数据

这篇具有很好参考价值的文章主要介绍了4.2、Flink任务怎样读取文件中的数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:
    def readTextFile(filePath: String): DataStream[String]
    def readTextFile(filePath: String, charsetName: String)

功能:
    1.读取文本格式的文件
    2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素
    3.可以指定字符集(默认为UDF-8)
    4.文件只会读取一次

源码分析:
    public DataStreamSource<String> readTextFile(String filePath, String charsetName) {

        // 初始化 TextInputFormat对象
        TextInputFormat format = new TextInputFormat(new Path(filePath));  
        // 指定路径过滤器(使用默认过滤器)
        format.setFilesFilter(FilePathFilter.createDefaultFilter());  
        // 指定Flink中的数据类型    
        TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; 
        // 指定字符集
        format.setCharsetName(charsetName);     
                                   
        // 调用 readFile 方法
        return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); 
    }

代码示例:

    public static void readTextFile() throws Exception {
        /*
         * TODO 功能说明
         *   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
         * */
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.将文本文件作为数据源
        env.readTextFile("data/1.txt").setParallelism(4).print();

        // 3.触发程序执行
        env.execute();
    }

3、readFile(已过时,不推荐使用)

语法说明:

定义:
    def readFile[T: TypeInformation](
        inputFormat: FileInputFormat[T],
        filePath: String,
        watchType: FileProcessingMode,
        interval: Long): DataStream[T] = {
      val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)
      asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))
    }

参数:
    inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)
    filePath    : 指定 文件路径
    watchType   : 指定 读取模式(提供了2个枚举值)
                       PROCESS_ONCE :只读取一次
                       PROCESS_CONTINUOUSLY :按照指定周期扫描文件
    interval    : 指定 扫描文件的周期(单位为毫秒)

功能:
    按照 指定的 文件格式 和 读取方式 读取数据
4.2、Flink任务怎样读取文件中的数据,# Flink API 使用技巧,flink,python,前端
FileInputFormat 的实现类

代码示例:文章来源地址https://www.toymoban.com/news/detail-635748.html

    public static void readFile() throws Exception {
        /*
         * TODO 功能说明
         *    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
         *    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
         *       按照指定的文件输入格式读取(持续的读取)文件
         * */

        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.将文本文件作为数据源
        String filePath = "data/1.txt";

        TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器
        textInputFormat.setCharsetName("UTF-8"); // 指定编码格式

        /*
         * readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)
         * 参数说明:
         *      @inputFormat : 指定文件输入格式
         *      @filePath    : 指定文件路径
         *      @watchType   : 指定监控类型,提供了两种读取策略
         *            PROCESS_ONCE : 只读取一次
         *            PROCESS_CONTINUOUSLY :持续读取,监控新增数据
         *      @interval : 指定连续扫描文件的周期(毫秒)
         * 重点提示:
         *      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该
         *           文件的全部内容,这将会打破`精确一次`的语义
         * */
        env.readFile(
                textInputFormat
                , filePath
                , FileProcessingMode.PROCESS_CONTINUOUSLY
                , 1000
        ).print();

        // 3.触发程序执行
        env.execute();
    }

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.将文本文件作为数据源
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat()
                , new Path("data/1.txt")).build();

        env.fromSource(fileSource
                , WatermarkStrategy.noWatermarks()
                , "read fileSource"
        ).print();

        // 3.触发程序执行
        env.execute();
    }

到了这里,关于4.2、Flink任务怎样读取文件中的数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 读取JSON文件 如何在Unity中读取Json文件中的数据

    Josn是一种轻量级的数据交换格式,JSON能够描述四种简单的类型(字符串、数字、布尔值及null)和两种结构化类型(对象及数组),在Unity里经常用Json来处理大量的字符串,容易解析,效率非常快。 基本结构 1、语法 数据存在键值对中 数据由逗号分隔 花括号保存对象 方括号保存

    2024年02月15日
    浏览(32)
  • Flink的API分层、架构与组件原理、并行度、任务执行计划

            Apache Flink的API分为四个层次,每个层次都提供不同的抽象和功能,以满足不同场景下的数据处理需求。下面是这四个层次的具体介绍: CEP API: Flink API 最底层的抽象为有状态实时流处理。其抽象实现是Process Function,并且Process Function被  框架集成到了DataStream API中

    2024年02月05日
    浏览(34)
  • python实现读取文件中的视频数据并实时展示

    要实现读取文件中的视频数据并实时展示,可以使用OpenCV库。以下是一个简单的示例代码: 在这个示例中,我们首先使用`cv2.VideoCapture()`函数打开视频文件。然后,我们使用一个无限循环来逐帧读取视频,并在窗口中显示当前帧。最后,我们释放资源并关闭窗口。注意,在循

    2024年02月12日
    浏览(36)
  • 用Python的pandas读取excel文件中的数据

    hello呀!各位铁子们大家好呀,今天呢来和大家聊一聊用Python的pandas读取excel文件中的数据。 使用pandas的 read_excel() 方法,可通过文件路径直接读取。注意到,在一个excel文件中有多个sheet,因此,对excel文件的读取实际上是读取指定文件、并同时指定sheet下的数据。可以一次读

    2024年02月02日
    浏览(33)
  • 如何使用pandas读取csv文件中的某一列数据

    使用pandas读取csv文件中的某一列数据,可以这样做: 先导入pandas模块: import pandas as pd 使用 pd.read_csv 函数读取csv文件: df = pd.read_csv(\\\"文件名.csv\\\") 使用 df[\\\"列名\\\"] 读取某一列数据: column = df[\\\"列名\\\"] 例如,如果你有一个csv文件叫做 example.csv ,并且有一列叫做 age ,你可以这样

    2024年02月13日
    浏览(31)
  • html5提供的FileReader是一种异步文件读取文件中的数据

    前言:FileReader是一种异步文件读取机制,结合input:file可以很方便的读取本地文件。 input:file 在介绍FileReader之前,先简单介绍input的file类型。 input type=\\\"file\\\" id=\\\"file\\\" input的file类型会渲染为一个按钮和一段文字。点击按钮可打开文件选择窗口,文字表示对文件的描述(大部分情

    2024年02月11日
    浏览(34)
  • 用bat 命令 修改sql文件中的数据库名字 新的名字通过读取配置文件中的字段获取

    在批处理脚本中,如果新数据库名存储在配置文件(比如config.ini)中的某个字段内,可以按照以下步骤进行: 假设你的配置文件内容如下: 要读取这个值并用于替换.sql文件中的旧数据库名,请使用以下批处理脚本: 在这个脚本中,首先通过 findstr 和 for /f 命令组合读取配置

    2024年02月02日
    浏览(53)
  • POI:从Excel文件中读取数据,向Excel文件中写入数据,将Excel表格中的数据插入数据库,将数据库中的数据添加到Excel表

    POI是Apache软件基金会用Java编写的免费开源的跨平台的 Java API,Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能。POI为“Poor Obfuscation Implementation”的首字母缩写,意为“可怜的模糊实现”。 所以POI的主要功能是可以用Java操作Microsoft Office的相关文件,但是一般我

    2024年02月10日
    浏览(39)
  • Flink(三)flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月16日
    浏览(37)
  • 【Flink】 Flink实时读取mysql数据

    准备 你需要将这两个依赖添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 读取 kafka 数据 这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包