Spark - 获取一定时间内的 Hdfs 全部文件并读取

这篇具有很好参考价值的文章主要介绍了Spark - 获取一定时间内的 Hdfs 全部文件并读取。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一.引言

二.获取 Interval 内文件

1.获取 FileSystem

2.获取全部 File

3.读取 Hdfs File


一.引言

有一个需求要求定时获取距离目前时间 Interval 范围之内的文件并读取,例如现在是 7:00,interval 为 30 min,则我们需要读取 6:30 - 7:00 的全部文件并读取。这里思路是通过 FileSystem 获取文件的 modofiyTime 然后计算其与当前时间的 interval,满足则保留文件名。

二.获取 Interval 内文件

1.获取 FileSystem

    val conf = new SparkConf().setAppName("Init Spark")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("error")    

    // 获取 FileSystem
    val fileSystem = new Path("viewfs:/user/...").getFileSystem(sc.hadoopConfiguration)
    

这里输入自己 Hdfs 任意对应目录即可获取当前 FileSystem。

2.获取全部 File

    // 保存全部满足的 File
    val satisfiedFiles = new ArrayBuffer[String]()

    // 待遍历的地址
    val basePath = new Path(s"/$target/$year/$month/$day/$hour")

    // 获取当前时间戳
    val curTime = System.currentTimeMillis()

    // 保留 interval=30 min 内文件
    val interval = 30 
    fs.listStatus(basePath).filter(dir => {
      val updateTime = dir.getModificationTime
      val delay = getInterval(curTime, updateTime)
      delay < interval
    }).foreach(file => {
      val path = basePath + File.separator + file.getPath.getName
      satisfiedFiles.append(path)
    })

file.getPath.getName 可以获取对应 File 的单独路径,而非完整路径,所以要添加 BasePath。 

getInterval 函数:

  // 获取两个时间戳 Min 级别间隔
  def getInterval(now: Long, fileTime: Long): Long = {
    val delayMin = (now - fileTime) / 1000 / 60
    delayMin
  }

3.读取 Hdfs File

    val allFile = satisfiedFiles.mkString(",")
    sc.textFile(allFile)

多个文件可以通过 ',' 分隔的形式供 Spark 读取。

Tips:

如果为了防止没有满足间隔的 File 导致 allFile 为空,可以在提交 spark 的脚本里增加忽略空文件的配置,避免任务异常:文章来源地址https://www.toymoban.com/news/detail-461331.html

--conf spark.files.ignoreMissingFiles=true \

到了这里,关于Spark - 获取一定时间内的 Hdfs 全部文件并读取的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • shell脚本之003获取固定时间段(分钟)内的日志,并将其定时通过sftp上传至服务器中

    #!/bin/bash export PATH=/home/ccbt/software/lftp-4.9.2/usr/local/bin/:$PATH # 获取当前系统时间 now=$(date +\\\"%Y/%m/%d %H:%M:%S\\\") echo \\\"当前日期时间:$now\\\" # 当前时间戳 now_time=$(date +%s) echo \\\"当前时间戳:$now_time\\\" #获取指定开始时间的时间戳 10分钟 time_befo=$(($now_time-600)) echo \\\"指定开始时间戳: $time_befo\\\"

    2024年02月01日
    浏览(47)
  • Spark解析JSON文件,写入hdfs

    一、用Sparkcontext读入文件,map逐行用Gson解析,输出转成一个caseclass类,填充各字段,输出。 解析JSON这里没有什么问题。 RDD覆盖写的时候碰到了一些问题 : 1.直接saveAsTextFile没有覆盖true参数; 2.转dataframe时,还得一个一个字段显化才能转成dataframe; 3.write时,一开始打算写

    2024年01月23日
    浏览(31)
  • HDFS的文件写入和文件读取流程

    Pipeline管道:  Pipeline,即管道。这是 HDFS 在上传⽂件写数据过程时采⽤的⼀种数据传输⽅式。客户端将数据块写⼊第⼀个数据节点,第⼀个数据节点保存数据之后再将块复制到第⼆个数据节点,后者保存后将其复制到第三个数据节点。通俗描述 pipeline 的过程就是:Client——

    2024年02月06日
    浏览(29)
  • python hdfs远程连接以及上传文件,读取文件内容,删除文件

    目录 一、python连接操作hdfs 1 往hdfs上传文件 2 处理并存储到hdfs 3 读取hdfs上的txt文件 这里使用的是 pip 安装,很方便:      

    2024年02月11日
    浏览(40)
  • 实验二、熟悉常用的HDFS操作(HDFS JavaAPI之读取/上传/删除文件)

    理解HDFS在Hadoop体系结构中的角色 熟练使用HDFS操作常用的shell命令 熟悉HDFS操作常用的Java API 操作系统:CentOS 8 Hadoop版本:3.3.1 jdk版本:1.8 Java IDE:Eclipse 1. 使用Hadoop命令操作分布式文件系统。 新建目录 在本地和hadoop中分别创建文件夹: 在本地创建目录: Hadoop创建目录: 上

    2023年04月08日
    浏览(47)
  • Java:读取excel文件中的内容(简单、详细、明确、有全部代码)

    注意: jxl 仅支持读取 .xls 文件,读 .xlsx 会报错! 在 pom.xml 中引入 jxl 的依赖 支持读取 xlsx 文件的 poi 依赖说明、使用方法及更多详细内容可以参考: Java读取excel的方式,一篇文章看懂(详细)

    2024年02月12日
    浏览(30)
  • 记录Java读取hdfs上的文件全过程

    文章目录 前言 一、项目大体流程 二、详细步骤 1.在idea里面创建空项目(小白也能看懂) 2.导入所需的jar包 2.输入代码后就可以实现了 总结         跟着白哥学Java,今天就来分享一下Java如何上传文件到hdfs上面, 提示:以下是一点见解         我们想要上传到hdfs,首先就

    2024年02月10日
    浏览(43)
  • Spark流式读取文件数据

    流式读取文件数据 from pyspark.sql import SparkSession ss = SparkSession.builder.getOrCreate() df_csv = ss.readStream.csv(‘hdfs://node1:8020/目录’) df_json = ss.readStream.json(‘hdfs://node1:8020/目录’) options2 ={ ‘host’:‘192.168.88.100’, ‘port’:9999 } options={ # 每个批次读取1个文件 ‘maxFilesPerTrigger’:1, ‘lat

    2024年01月21日
    浏览(50)
  • Spark读取Excel文件

    2023年12月31日
    浏览(41)
  • hdfs常用端口号、常用配置文件,集群时间同步

    hadoop3.x HDFS NameNode 内部通常端口:8020/9000/9820 HDFS NameNode 对用户的查询端口:9870 历史服务器:19888 hadoop2.x HDFS NameNode 内部通常端口: 8020/9000/9820 HDFS NameNode 对用户的查询端口:50070 Yarn查看任务运行情况:8088 历史服务器:19888 3.x版本 core-xite.xml hdfs-site.xml yarn-site.xml mapred-sit

    2024年02月20日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包