2.2 如何使用FlinkSQL读取&写入到文件系统(HDFS\Local\Hive)

这篇具有很好参考价值的文章主要介绍了2.2 如何使用FlinkSQL读取&写入到文件系统(HDFS\Local\Hive)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、文件系统 SQL 连接器

2、如何指定文件系统类型

3、如何指定文件格式

4、读取文件系统

4.1 开启 目录监控 

4.2 可用的 Metadata

5、写出文件系统

5.1 创建分区表

5.2 滚动策略、文件合并、分区提交

5.3 指定 Sink Parallelism

6、示例_通过FlinkSQL读取kafka在写入hive表

6.1、创建 kafka source表用于读取kafka

6.2、创建 hdfs sink表用于写出到hdfs

6.3、insert into 写入到 hdfs_sink_table

6.4、查询 hdfs_sink_table

6.5、创建hive表,指定local


1、文件系统 SQL 连接器

文件系统连接器允许从本地分布式文件系统进行读写数据

官网链接:文件系统 SQL 连接器

flink sql 读取hdfs,# FlinkSQL 使用技巧,hdfs,大数据,服务器


2、如何指定文件系统类型

创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

参考官网:文件系统类型

CREATE TABLE filesystem_table (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 本地文件系统
  'path' = 'file:///URI',
  -- HDFS文件系统
  'path' = 'hdfs://URI',
  -- 阿里云对象存储 
  'path' = 'oss://URI',
  'format' = 'json'
);

3、如何指定文件格式

FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

来对数据进行解析后映射到表中的字段中

flink sql 读取hdfs,# FlinkSQL 使用技巧,hdfs,大数据,服务器

CREATE TABLE filesystem_table_file_format (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 指定文件格式类型
  'format' = 'json|csv|orc|raw'
);

4、读取文件系统

FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

注意:

        1、当读取目录时,对目录中的文件进行 无序的读取

        2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

             当开启目录监控(source.monitor-interval)时,才是流处理模式

4.1 开启 目录监控 

通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

注意:

        只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

-- 目录监控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (
  id INT,
  name STRING,
  `file.name` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
  'format' = 'json',
  'source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
);

-- 持续读取
select * from filesystem_source_table;

4.2 可用的 Metadata

使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

注意: 所有 metadata 都是只读的

flink sql 读取hdfs,# FlinkSQL 使用技巧,hdfs,大数据,服务器

-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (
  id INT,
  name STRING,
  `file.path` STRING NOT NULL METADATA,
  `file.name` STRING NOT NULL METADATA,
  `file.size` BIGINT NOT NULL METADATA,
  `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'format' = 'json'
);

select * from filesystem_source_table_read_metadata;

运行结果:

flink sql 读取hdfs,# FlinkSQL 使用技巧,hdfs,大数据,服务器


5、写出文件系统

5.1 创建分区表

FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

-- 创建分区表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'partition.default-name' = 'default_partition',
  'format' = 'json'
);

-- 动态分区写入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES
  (1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);

-- 静态分区写入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES
  (1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);

-- 查询分区表数据
select * from filesystem_source_table_partition where ds = '20231010';

5.2 滚动策略、文件合并、分区提交

可以看之前的博客:flink写入文件时分桶策略

官网链接:官网分桶策略


5.3 指定 Sink Parallelism

当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分区字段,天
  `hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6、示例_通过FlinkSQL读取kafka在写入hive表

需求:

        使用FlinkSQL将kafka数据写入到hdfs指定目录中

        根据kafka的timestamp进行分区(按小时分区)文章来源地址https://www.toymoban.com/news/detail-722282.html

6.1、创建 kafka source表用于读取kafka

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(
  `log` STRING,
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH (
  'connector' = 'kafka',
  'topic' = '20231017',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

6.2、创建 hdfs sink表用于写出到hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分区字段,天
  `hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6.3、insert into 写入到 hdfs_sink_table

-- 流式 sql,插入文件系统表
insert into hdfs_sink_table
select 
	log
	,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
	,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

6.4、查询 hdfs_sink_table

-- 批式 sql,使用分区修剪进行选择
select * from hdfs_sink_table;

6.5、创建hive表,指定local

create table `kafka_to_hive` (
`log` string comment '日志数据')
 comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string) 
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

到了这里,关于2.2 如何使用FlinkSQL读取&写入到文件系统(HDFS\Local\Hive)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HDFS常用操作以及使用Spark读取文件系统数据

    掌握在Linux虚拟机中安装Hadoop和Spark的方法; 熟悉HDFS的基本使用方法; 掌握使用Spark访问本地文件和HDFS文件的方法。 启动Hadoop,在HDFS中创建用户目录“/user/hadoop” 在Linux系统的本地文件系统的“/home/hadoop”目录下新建一个文本文件test.txt,并在该文件中随便输入一些内容,

    2024年04月22日
    浏览(42)
  • Hadoop Distributed System (HDFS) 写入和读取流程

    一、HDFS HDFS全称是Hadoop Distributed System。HDFS是为以流的方式存取大文件而设计的。适用于几百MB,GB以及TB,并写一次读多次的场合。而对于低延时数据访问、大量小文件、同时写和任意的文件修改,则并不是十分适合。 目前HDFS支持的使用接口除了Java的还有,Thrift、C、FUSE、

    2024年02月08日
    浏览(36)
  • SAP_ABAP_编程基础_文件处理(CRUD)_R3系统_打开文件 / 关闭文件 / 删除文件 / 向文件中写入数据 / 从文件中读取数据 / 使用服务器上的文件

    SAP ABAP 顾问(开发工程师)能力模型_Terry谈企业数字化的博客-CSDN博客 文章浏览阅读490次。目标:基于对SAP abap 顾问能力模型的梳理,给一年左右经验的abaper 快速成长为三年经验提供超级燃料! https://blog.csdn.net/java_zhong1990/article/details/132469977 平时在  ‘ 工地搬砖 ’,很少关

    2024年02月22日
    浏览(46)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(52)
  • HDFS文件创建与写入

    实验环境 Linux Ubuntu 16.04 前提条件: 1)Java 运行环境部署完成 2)Hadoop 的单点部署完成   实验内容 在上述前提条件下,学习HDFS文件创建、写入、追加与合并等操作 实验步骤 启动HDFS,在命令行窗口输入下面的命令: 运行后显示如下,根据日志显示,分别启动了NameNode、Dat

    2024年02月02日
    浏览(41)
  • Spark解析JSON文件,写入hdfs

    一、用Sparkcontext读入文件,map逐行用Gson解析,输出转成一个caseclass类,填充各字段,输出。 解析JSON这里没有什么问题。 RDD覆盖写的时候碰到了一些问题 : 1.直接saveAsTextFile没有覆盖true参数; 2.转dataframe时,还得一个一个字段显化才能转成dataframe; 3.write时,一开始打算写

    2024年01月23日
    浏览(41)
  • 一百七十三、Flume——Flume写入HDFS后的诸多小文件问题

    在用Flume采集Kafka中的数据写入HDFS后,发现写入HDFS的不是每天一个文件,而是一个文件夹,里面有很多小文件,浪费namenode的宝贵资源 在Flume任务的配置文件设置 a1.sinks.k1.hdfs.rollSize = 0   a1.sinks.k1.hdfs.rollCount = 0   而不是 a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1

    2024年02月09日
    浏览(42)
  • java 文件读取和写入

    1.文件名 1.InputStream(字节流)  和Reader(字符流) 2.OutputStream(字节流) 和 Writer(字符流) Java提供了File类 来表示一个文件(通过构造方法来指定路径) 绝对路径 目录与目录之间用 表示,,也可以用 / ,形如D:xxxxxx的就是绝对路径 相对路径 ..(当前路径的上一级路径) 和 . (当前路径) 表示的

    2024年02月08日
    浏览(42)
  • Java 按行读取写入文件

    目录 一、按行读取 二、按行写入 这里采用java.nio.file.Files:readAllLines方法,参见JDK8 API官方文档 Java Platform SE 8  具体实现Demo: 这里采用FileWriter与BufferWriter方法 FileWriter: 用于写入字符流。对于写入原始字节的流,可以考虑使用FileOutputStream。 BufferWriter:  将文本写入字符输出流

    2024年02月15日
    浏览(44)
  • qt xml文件写入读取

    ****************************************************************************** QT       += core gui xml ****************************************************************************** #include \\\"mainwindow.h\\\" #include QDomDocument #include QTextStream #include QFile #include QDebug MainWindow::MainWindow(QWidget *parent)     : QMainWindow(parent) {    

    2024年02月09日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包