Spark学习——DataFrame清洗HDFS日志并存入Hive中

这篇具有很好参考价值的文章主要介绍了Spark学习——DataFrame清洗HDFS日志并存入Hive中。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1.开启Hadoop集群和Hive元数据、Hive远程连接

2.配置

3.读取日志文件并清洗

4.单独处理第四列的数据——方法一:

5.单独处理第四列的数据——方法二: 

6.单独处理第四列的数据——方法三: 

7.数据清洗结果展示

8.存入Hive中

9.DataGrip中的代码


HDFS日志文件内容:

2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Downloaded file edits_tmp_0000000000000030396-0000000000000033312_0000000000025236168 size 0 bytes.
2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.Checkpointer: Checkpointer about to load edits from 1 stream(s).
2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Reading /opt/soft/hadoop313/data/dfs/namesecondary/current/edits_0000000000000030396-0000000000000033312 expecting start txid #30396
2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Start loading edits file /opt/soft/hadoop313/data/dfs/namesecondary/current/edits_0000000000000030396-0000000000000033312

我们要将上面的日志,使用DataFrame API清洗成表格并存入Hive中,清洗后的表格如下:

Spark学习——DataFrame清洗HDFS日志并存入Hive中

1.开启Hadoop集群和Hive元数据、Hive远程连接

Spark学习——DataFrame清洗HDFS日志并存入Hive中

2.配置

 val spark: SparkSession = SparkSession.builder().appName("demo01")
      .master("local[*]")
      .config("hive.metastore.uris", "thrift://lxm147:9083")
      .enableHiveSupport()
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._

3.读取日志文件并清洗

// TODO 读取文件清洗
    val df1: DataFrame = sc.textFile("in/hadoophistory.log")
      .map(_.split(" "))
      .filter(_.length >= 8)
      .map(x => {
        val tuple: (String, String, String, String, String) = (x(0), x(1), x(2), x(3), x(4))
        tuple
      }).toDF()
    df1.show(4,false)
    /*
    +----------+--------+----+-------------------------------------------------------+------------+
    |_1        |_2      |_3  |_4                                                     |_5          |
    +----------+--------+----+-------------------------------------------------------+------------+
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.TransferFsImage:|Downloaded  |
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.Checkpointer:   |Checkpointer|
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.FSImage:        |Reading     |
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.FSImage:        |Start       |
    +----------+--------+----+-------------------------------------------------------+------------+
    */

4.单独处理第四列的数据——方法一:

  // TODO 单独处理第四列的数据
    val df2: DataFrame =
      df1.withColumn("test", split(col("_4"), "\\."))
        .select(
          $"_1".as("t1"),
          $"_2".as("t2"),
          $"_3".as("t3"),
          col("test").getItem(0).as("a0"),
          col("test").getItem(1).as("a1"),
          col("test").getItem(2).as("a2"),
          col("test").getItem(3).as("a3"),
          col("test").getItem(4).as("a4"),
          col("test").getItem(5).as("a5"),
          col("test").getItem(6).as("a6"),
          $"_5".as("t5")
        )

5.单独处理第四列的数据——方法二: 

val df2: DataFrame = 
      df1.rdd.map(
      line => {
        val strings: Array[String] = line.toString().split(",")
        val value: Array[String] = strings(3).split("\\.")
        (strings(0).replaceAll("\\[", ""), strings(1), strings(2),
          value(0), value(1), value(2), value(3), value(4), value(5), value(6),
          strings(4).replaceAll("]", "")
        )
      }
    ).toDF("t1", "t2", "t3", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "t5")

6.单独处理第四列的数据——方法三: 

方法三比较麻烦,但是可以对数据类型做单独处理,可以参考我的另一篇博文《Spark高级操作之json复杂和嵌套数据结构的操作》

另一篇博文中读取的日志数据更换了

7.数据清洗结果展示

df2.show(4, truncate = false)

+----------+--------+----+---+------+------+----+------+--------+----------------+------------+
|t1        |t2      |t3  |a1 |a2    |a3    |a4  |a5    |a6      |a7              |t5          |
+----------+--------+----+---+------+------+----+------+--------+----------------+------------+
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|TransferFsImage:|Downloaded  |
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|Checkpointer:   |Checkpointer|
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|FSImage:        |Reading     |
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|FSImage:        |Start       |
+----------+--------+----+---+------+------+----+------+--------+----------------+------------+

8.存入Hive中

println("正在存储......")
df2.write.mode(SaveMode.Overwrite).saveAsTable("shopping.dataframe")

spark.close()
sc.stop()
println("存储完毕......")

9.DataGrip中的代码

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

use shopping;
show tables;
select * from dataframe;

Spark学习——DataFrame清洗HDFS日志并存入Hive中

参考文章《将Spark数据帧保存到Hive:表不可读,因为“ parquet not SequenceFile”》文章来源地址https://www.toymoban.com/news/detail-411262.html

到了这里,关于Spark学习——DataFrame清洗HDFS日志并存入Hive中的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • [AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅

    作为一位Java大师,我始终追求着技术的边界,最近我将目光聚焦在大数据领域。在这个充满机遇和挑战的领域中,我深入研究了Hadoop、HDFS、Hive和Spark等关键技术。本篇博客将从\\\"是什么\\\"、\\\"为什么\\\"和\\\"怎么办\\\"三个角度,系统地介绍这些技术。 Hadoop Hadoop是一个开源的分布式计算

    2024年02月03日
    浏览(40)
  • 云计算与大数据之间的羁绊(期末不挂科版):云计算 | 大数据 | Hadoop | HDFS | MapReduce | Hive | Spark

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 大数据是需求,云计算是手段。没有大数据,就不需要云计算;没有云计算,就无法处理大数据。 所有的计算能力、存储能力、和各种各样功能的应用都通过网络

    2024年02月04日
    浏览(64)
  • 二百一十一、Flume——Flume实时采集Linux中的Hive日志写入到HDFS中(亲测、附截图)

    为了实现用Flume实时采集Hive的操作日志到HDFS中,于是进行了一场实验 [root@hurys23 conf]# find / -name hive.log /home/log/hive312/hive.log [root@hurys23 conf]# vi  flume-file-hdfs.conf # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources

    2024年02月04日
    浏览(65)
  • Doris-05-集成Spark、Flink、Datax,以及数据湖分析(JDBC、ODBC、ES、Hive、多源数据目录Catalog)

    准备表和数据: Spark 读写 Doris Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。 代码库地址:https://github.com/apache/incubator-doris-spark-connector 支持从 Doris 中读取数据 支持 Spark DataFrame 批量/流式 写入 Doris 可以将 Doris 表映射为 DataFra

    2024年02月06日
    浏览(62)
  • 基于Hadoop的MapReduce网站日志大数据分析(含预处理MapReduce程序、hdfs、flume、sqoop、hive、mysql、hbase组件、echarts)

    需要本项目的可以私信博主!!! 本项目包含:PPT,可视化代码,项目源码,配套Hadoop环境(解压可视化),shell脚本,MapReduce代码,文档以及相关说明教程,大数据集! 本文介绍了一种基于Hadoop的网站日志大数据分析方法。本项目首先将网站日志上传到HDFS分布式文件系统

    2024年02月16日
    浏览(65)
  • 【Spark+Hadoop+Hive+MySQL+Presto+SpringBoot+Echarts】基于大数据技术的用户日志数据分析及可视化平台搭建项目

    点我获取项目数据集及代码 随着我国科学技术水平的不断发展,计算机网络技术的广泛应用,我国已经步入了大数据时代。在大数据背景下,各种繁杂的数据层出不穷,一时难以掌握其基本特征及一般规律,这也给企业的运营数据分析工作增添了不小的难度。在大数据的背景

    2024年02月10日
    浏览(62)
  • 将linux的top命令内容存入日志中

    使用top命令并结合grep筛选,将top结果保存到日志中中,方便分析进程对系统资源的占用。 该命令的含义是: top参数: -b 批处理模式 -d 更新间隔(秒) -n top执行次数,若不设置,则一直执行,直到手动kill

    2024年03月13日
    浏览(50)
  • 基于Spark的数据清洗与转换

    未经许可,禁止以任何形式转载,若要引用,请标注链接地址 全文共计7326字,阅读大概需要3分钟 掌握数据整合、数据清洗和数据转换方法。 1、整合来自不同数据源的数据。   2、对数据进行清洗。   3、对数据进行转换。 数据质量一直是业界普遍存在的问题。不正确

    2024年02月09日
    浏览(32)
  • Springboot配置Log4j日志系统,并将日志存入数据库

    Log4j是apache公司开发的一款日志管理系统,可以高效的管理系统中出现的BUG或者各种信息,并且可以已文本的方式或者数据库存入的方式来记录数据 在pom.xml中导入Log4j依赖 在Resources文件夹下创建一个log4j.properties文件 编写配置文件 这是个测试类 可以看见,控制台和数据库表

    2024年02月08日
    浏览(94)
  • [spark] DataFrame 的 checkpoint

    在 Apache Spark 中,DataFrame 的 checkpoint 方法用于强制执行一个物理计划并将结果缓存到分布式文件系统,以防止在计算过程中临时数据丢失。这对于长时间运行的计算过程或复杂的转换操作是有用的。 具体来说, checkpoint 方法执行以下操作: 将 DataFrame 的物理计划执行,并将结

    2024年02月03日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包