Spark 数据读取保存

这篇具有很好参考价值的文章主要介绍了Spark 数据读取保存。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统:

  • 文件格式:Text 文件、Json 文件、csv 文件、Sequence 文件以及 Object 文件
  • 文件系统:本地文件系统、HDFS、Hbase 以及数据库

1. 读写 text/hdfs 文件

text/hdfs 类型的文件读都可以用 textFile(path),保存使用 saveAsTextFile(path)

// 读取本地文件,必须保证每个节点都有该文件
val rdd = sc.textFile("./xx.txt")

// 保存到 hdfs
rdd.saveAsTextFile(hdfs://hadoop1:9000/test/info.json)

2. 读取 json 文件

json 文件主要是需要解析其 json 格式,一般采用:SparkSQL,也可以使用 fastjson、scala.util.parsing.json.JSON

scala> val rdd = sc.textFile("hdfs://hadoop1:9000/test/info.json")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop1:9000/test/info.json MapPartitionsRDD[45] at textFile at <console>:24

scala> rdd.collect()
res39: Array[String] = Array({"age": 0, "name": "rose0"}, {"age": 1, "name": "rose1"}, {"age": 2, "name": "rose2"}, {"age": 3, "name": "rose3"}, {"age": 4, "name": "rose4"}, {"age": 5, "name": "rose5"}, {"age": 6, "name": "rose6"}, {"age": 7, "name": "rose7"}, {"age": 8, "name": "rose8"}, {"age": 9, "name": "rose9"}, {"age": 10, "name": "rose10"}, {"age": 11, "name": "rose11"}, {"age": 12, "name": "rose12"}, {"age": 13, "name": "rose13"}, {"age": 14, "name": "rose14"}, {"age": 15, "name": "rose15"}, {"age": 16, "name": "rose16"}, {"age": 17, "name": "rose17"}, {"age": 18, "name": "rose18"}, {"age": 19, "name": "rose19"}, {"age": 20, "name": "rose20"}, {"age": 21, "name": "rose21"}, {"age": 22, "name": "rose22"}, {"age": 23, "name": "rose23"}, {"age": 24, "name": "rose24"}, {"age": 25, "...
scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON

// 解析到的结果其实就是 Option 组成的数组, Option 存储的就是 Map 对象
scala> val rdd2 = rdd.map(JSON.parseFull).collect()
rdd2: Array[Option[Any]] = Array(Some(Map(age -> 0.0, name -> rose0)), Some(Map(age -> 1.0, name -> rose1)), Some(Map(age -> 2.0, name -> rose2)), Some(Map(age -> 3.0, name -> rose3)), Some(Map(age -> 4.0, name -> rose4)), Some(Map(age -> 5.0, name -> rose5)), Some(Map(age -> 6.0, name -> rose6)), Some(Map(age -> 7.0, name -> rose7)), Some(Map(age -> 8.0, name -> rose8)), Some(Map(age -> 9.0, name -> rose9)), Some(Map(age -> 10.0, name -> rose10)), Some(Map(age -> 11.0, name -> rose11)), Some(Map(age -> 12.0, name -> rose12)), Some(Map(age -> 13.0, name -> rose13)), Some(Map(age -> 14.0, name -> rose14)), Some(Map(age -> 15.0, name -> rose15)), Some(Map(age -> 16.0, name -> rose16)), Some(Map(age -> 17.0, name -> rose17)), Some(Map(age -> 18.0, name -> rose18)), Some(Map(age -> 19.0, na...

3. 读取 SequenceFile 文件

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)

val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))

//    // 保存 SequenceFile
//    rdd1.saveAsSequenceFile("test_sequence")

// 读取时需要指定读取数据的数据类型 [String, Int]
val rdd2 = sc.sequenceFile[String, Int]("test_sequence")
rdd2.collect().foreach(println)

运行结果:

(a,1)
(b,2)
(c,3)

4. 读取 ObjectFile 文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制,可以通过 objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出

 // 保存
val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
rdd1.saveAsObjectFile("test_object_file")

// 读取
val rdd2 = sc.objectFile[(String, Int)]("test_object_file")

rdd2.collect().foreach(println)

5. 从 HDFS 读写文件

Spark 的整个生态系统与 Hadoop 完全兼容的,所以对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样支持。

Hadoop 有新旧两套 API 接口,为了能够兼容 Spark 也有两套,分别为:HadoopRDD 、newHadoopRDD,两个接口函数的参数分别为:

  • 输入格式(InputFormat):输入数据类型,新旧接口引用的版本分别为:org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
  • 键类型:[K, V] 中的 K
  • 值类型:[K, V] 中的 V
  • 分区值:指定由外部存储生成的 RDDpartition 数据的最小值,若没有指定,系统使用默认值 defaultMinSplits

Hadoop 中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压

如果用SparkHadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDDnewAPIHadoopRDD两个类就行了

6. HBase 读写

Spark 读取 HBase

  • 输入类型:org.apache.hadoop.hbase.mapreduce.TableInputFormat
  • 输出类型:结果为键值对,键的类型为 org.apache.hadoop.hbase.io.ImmutableBytesWritable,值的类型为 org.apache.hadoop.hbase.client.Result

连接集群

spark 应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper

  • 第一种是将 hbase-site.xml 文件加入 classpath
  • 第二种是在 HBaseConfiguration 实例中设置

6.1 Maven 依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.mortbay.jetty</groupId>
            <artifactId>servlet-api-2.5</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
        </exclusion>
    </exclusions>

</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.mortbay.jetty</groupId>
            <artifactId>servlet-api-2.5</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

6.2 读取 HBase 数据

hbase 读取数据转化成 RDD

package top.midworld.spark1016.hbase_access

import java.util
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import org.json4s.jackson.Serialization

import scala.collection.mutable

object HbaseRead {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = session.sparkContext

    // 连接 HBase 的配置
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2")

    // 读取数据
    val rdd1 = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    val resultRDD = rdd1.map {
      // it 封装了 rowkey、item 里面才是数据
      case (it, results) => {
        // 定义一个可变 map
        val resultMap = mutable.Map[String, Any]()

        // rowkey 添加到 resultMap 中
        resultMap += "rowKey" -> Bytes.toString(it.get())

        // 将其他数据添加到 resultMap 中
        val cells: util.List[Cell] = results.listCells()

        import scala.collection.JavaConversions._
        for (cell <- cells) {
          // 列名 - 列值
          val key = Bytes.toString(CellUtil.cloneQualifier(cell))
          val value = Bytes.toString(CellUtil.cloneValue(cell))

          resultMap += key -> value
        }
        // 把map转成json  json4s(json4scala)
        implicit val df = org.json4s.DefaultFormats
        Serialization.write(resultMap)
      }
    }

    resultRDD.collect().foreach(println)
    sc.stop()
  }
}

运行结果:

{"alias2":"jun2","rowKey":"10004"}
{"alias4":"jun4","rowKey":"10011"}
{"alias5":"jun5","rowKey":"10016"}

其他应用:

val count = rdd1.count()
println("rdd1 RDD Count:" + count)
rdd1.cache()    // 缓存,避免 rdd 重新计算

rdd1.foreach({
    case (_, results) => {
    val rowKey = Bytes.toString(results.getRow) 
    // info 为列族、alias2 为列名
    val name = Bytes.toString(results.getValue("info".getBytes, "alias2".getBytes))

    println(rowKey, name)
    }
})

6.2.1 sbt 打包编译 spark 程序

上面使用 IDEAWindows 上测试,在 Linux 运行,需要将 spark 程序打包为 jar 包,常用的方法有:maven、sbt,这里采用 sbt

1、环境准备

  • 开启 Hadoop、zookeeper、spark、hbase 集群
  • hbase/lib 中的一些 jar 包拷贝到 spark/jars/hbase
// 在 spark 安装目录 jars 中新建 hbase/ 目录
cd /home/hadoop/apps/spark-2.2.0/jars/
mkdir hbase
cd hbase

// 拷贝以下 jar 包到 spark/jars/hbase 中
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/hbase*.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/guava-12.0.1.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/htrace-core-3.1.0-incubating.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/protobuf-java-2.5.0.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/metrics-core-2.2.0.jar .

注意:缺少 metrics-core-2.2.0.jar 会报 Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge,可以参考:https://blog.csdn.net/u010842515/article/details/51451883

2、编写 spark 程序:

cd /home/hadoop/apps/spark-2.2.0/mycode/
mkdir hbase
cd hbase

mkdir -p src/main/scala/
vim SparkOperateHBase.scala

spark 程序内容:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import java.util
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.json4s.jackson.Serialization

import scala.collection.mutable

object SparkOperateHBase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)


    // 连接 HBase 的配置
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2")

    // 读取数据
    val rdd1 = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    val resultRDD = rdd1.map {
      // it 封装了 rowkey、item 里面才是数据
      case (it, results) => {
        // 定义一个可变 map
        val resultMap = mutable.Map[String, Any]()

        // rowkey 添加到 resultMap 中
        resultMap += "rowKey" -> Bytes.toString(it.get())

        // 将其他数据添加到 resultMap 中
        val cells: util.List[Cell] = results.listCells()

        import scala.collection.JavaConversions._
        for (cell <- cells) {
          // 列名 - 列值
          val key = Bytes.toString(CellUtil.cloneQualifier(cell))
          val value = Bytes.toString(CellUtil.cloneValue(cell))

          resultMap += key -> value
        }
        // 把map转成json  json4s(json4scala)
        implicit val df = org.json4s.DefaultFormats
        Serialization.write(resultMap)
      }
    }

    resultRDD.collect().foreach(println)
    sc.stop()
  }
}

3、编写 sbt 程序:

vim simple.sbt

// libraryDependencies 为 spark 程序中用到的依赖包

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"

4、编译打包:

// jar 包位置 /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar

cd /home/hadoop/apps/spark-2.2.0/mycode/hbase

// 编译打包
/home/hadoop/apps/sbt/run.sh package

5、提交 spark 任务:文章来源地址https://www.toymoban.com/news/detail-483783.html

cd /home/hadoop/apps/spark-2.2.0/mycode/hbase

/home/hadoop/apps/spark-2.2.0/bin/spark-submit --driver-class-path /home/hadoop/apps/spark-2.2.0/jars/hbase/*:/home/hadoop/apps/hbase-1.2.6/conf/ --class "SparkOperateHBase" /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar

6.3 往 Hbase 写入数据

package top.midworld.spark1016.hbase_access

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableReduce
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession

object HBaseWrite {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = session.sparkContext

    // 连接 HBase 的配置
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "stu")

    // 往 hbase 写入数据
    val job = Job.getInstance(hbaseConf)
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Put])

    val dataRDD = sc.parallelize(
      List(
        ("10017", "scala", "1"),
        ("10018", "spark", "2"),
        ("10019", "java", "3")
      )
    )

    //    将 rdd 封装为 TableReduce 格式
    val hbaseRDD = dataRDD.map {
      case (rowKey, name, age) =>
        // 设置 rowKey
        val rk = new ImmutableBytesWritable()
        rk.set(Bytes.toBytes(rowKey))

        // 添加数据
        val put = new Put(Bytes.toBytes(rowKey))
        // 分别为列族:info、列名:name、值:name/age
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))

        // return 返回
        (rk, put)
    }

    // 写入
    hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}

到了这里,关于Spark 数据读取保存的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark连接Hive读取数据

            Ubuntu 16.04 LTS         ubuntu-16.04.6-desktop-i386.iso          spark-3.0.0-bin-without-hadoop.tgz           hadoop-3.1.3.tar.gz         apache-hive-3.1.2-bin.tar.gz         spark-hive_2.12-3.2.2.jar         openjdk 1.8.0_292         mysql-connector-java-5.1.40.tar.gz         

    2024年02月01日
    浏览(31)
  • spark读取数据写入hive数据表

    目录 spark 读取数据 spark从某hive表选取数据写入另一个表的一个模板 概述: create_tabel建表函数,定义日期分区 删除原有分区drop_partition函数 generate_data 数据处理函数,将相关数据写入定义的表中  注: 关于 insert overwrite/into 中partition时容易出的分区报错问题:  添加分区函数

    2024年01月19日
    浏览(42)
  • pb如何在数据库保存和读取图片

    字段类型Image(不同数据库不同,如果没有再查找blob等类型),然后使用如下编程套路: 读取:    这样的字段不能放在数据窗口的Detail节中,通常用户点击某行数据,获取该行的主键信息,以该信息为条件检索图片信息。比如,主键为id,图片保存在zp字段中:    在dw_1的

    2024年02月09日
    浏览(29)
  • 读取csv数据到词云图并保存图片

    要用的第三方库 pyecharts snap_selenium pandas 用snap_selenium中的snapshot,但是snapshot,只支持Chrome,和Safari 而我用的edge,但是我有msedgedriver,以前写selenium爬虫的时候,驱动edge。 所以,我决定偷梁换柱,把snapshot的源码中的Chrome给改了。 😆😆😆😆 看源码 偷梁换柱 可以驱动,小操

    2024年02月06日
    浏览(58)
  • 一文学会用Python读取Excel数据并保存

    文章目录 一、使用的软件 二、教程介绍 1、读取CSV数据 2、读取xlsx数据 3、输出为csv文件 4、 输出为excel文件

    2024年02月12日
    浏览(41)
  • Python 读取数据并保存为txt文件的方式

     首先是读取文件,这里使用了pandas库 方式一:将数据框的数据存入txt文件 - - - 使用pandas库 sep=\\\'t\\\'表示用Tab键分隔不同字段,index=False表示不带有行号的输出 方式二:将字符串保存到txt文件 - - - .write() 方式三:使用numpy库

    2024年02月08日
    浏览(38)
  • Vivado如何保存和读取FPGA的ILA数据

    Vivado如何保存和读取FPGA的ILA数据 在FPGA开发中,为了调试程序并更好地理解硬件运行的状态,我们通常需要使用逻辑分析仪(ILA)来观测FPGA内部的信号。但是,ILA数据的保存和读取对于初学者来说可能会有些棘手。因此,本文将介绍如何在Vivado中保存和读取FPGA的ILA数据。 一

    2024年02月08日
    浏览(37)
  • c#winform窗体如何实现数据的保存和读取

    学生类代码内容如下: 在c#winform中我们在写程序时,经常需要进行数据处理,那么数据如何保存和读取(下面我们通过序列化和反序列化的方式来实现) 首先我们建立一个外部实体类(Student类) 学生类代码内容如下: 第二步构建winform窗体  第三步:从图上按钮可以发现现在

    2024年02月02日
    浏览(28)
  • Spark读取kafka(流式和批数据)

    2024年01月21日
    浏览(48)
  • Python处理xlsx文件(读取、转为列表、新建、写入数据、保存)

    xlsxwriter**库对于xslx表的列数不做限制, xlrd 库不能写入超过65535行,256列的数据。 由于需要处理的数据行列数较多,遇到报错才发现库的限制问题,记录一下。

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包