使用spark进行hbase的bulkload

这篇具有很好参考价值的文章主要介绍了使用spark进行hbase的bulkload。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用spark进行hbase的bulkload

一、 背景

HBase 是一个面向列,schemaless,高吞吐,高可靠可水平扩展的 NoSQL 数据库,用户可以通过 HBase client 提供的 put get 等 api 实现在数据的实时读写。在过去的几年里,HBase 有了长足的发展,它在越来越多的公司里扮演者越来越重要的角色。
HBase 擅长于海量数据的实时读取,原生 HBase 没有二级索引,复杂查询场景支持的不好。同时因为 split,磁盘,网络抖动,Java GC 等多方面的因素会影响其 RT 表现,所以通常我们在使用HBase的同时也会使用其他的存储中间件,比如 ES,Reids,Mysql 等等。避免 HBase 成为信息孤岛,我们需要数据导入导出的工具在这些中间件之间做数据迁移,而最常用的莫过于阿里开源的 DataX。Datax从 其他数据源迁移数据到 HBase 实际上是走的 HBase 原生 api 接口,在少量数据的情况下没有问题,但当我们需要从 Hive 里,或者其他异构存储里批量导入几亿,几十亿的数据,那么用 DataX 这里就显得不那么适合,因为走原生接口为了避免影响生产集群的稳定性一定要做好限流,那么海量数据的迁移就很很慢,同时数据的持续写入会因为 flush,compaction 等机制占用较多的系统资源。为了解决批量导入的场景,Bulkload 应运而生。

二、HBase Bulkload
在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。

1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,
在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘
(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。

使用spark进行hbase的bulkload,数据库,spark,hbase


2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

  • Extract,异构数据源数据导入到 HDFS 之上。
  • Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
  • Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。

使用spark进行hbase的bulkload,数据库,spark,hbase

 三、实践

hive表

使用spark进行hbase的bulkload,数据库,spark,hbase


 

 hbase表

使用spark进行hbase的bulkload,数据库,spark,hbase

 依赖

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<properties>

        <maven.compiler.source>1.8</maven.compiler.source>

        <maven.compiler.target>1.8</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>

        <log4j.version>1.7.30</log4j.version>

        <zk.version>3.4.5-cdh5.16.2</zk.version>

        <scala.version>2.12.10</scala.version>

        <scala.tools.version>2.12</scala.tools.version>

        <spark.version>3.2.0</spark.version>

        <hbase.version>1.2.0-cdh5.16.2</hbase.version>

        <config.version>1.4.0</config.version>

    </properties>

     

    <repositories>

        <repository>

            <id>nexus-aliyun</id>

            <url>http://maven.aliyun.com/nexus/content/groups/public</url>

        </repository>

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>

    <dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.version}</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${log4j.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>${zk.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

         

    </dependencies>

spark 代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

package com.jojo

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}

import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

 * Description:Hbase批量加载   同一列族多列

 */

object HbaseBulkLoadApp {

  val zookeeperQuorum = "cdh01,cdh02,cdh03"//zookeeper信息

  val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件

  val hFilePath = "hdfs://cdh03:8020/tmp/result"//hfile的存储路径

  val hdfsRootPath = "hdfs://cdh03:8020/"//根路径

  val tableName = "sample_07"//表名

  val familyName = "basic"//列族

  val arr = Array("code","description""total_emp","salary")//列的名字集合

  def main(args: Array[String]): Unit = {

    //获取content

    val sparkConf = new SparkConf()

      .setAppName(s"${this.getClass.getSimpleName}")

      .setMaster("local")

      //指定序列化格式,默认是java序列化

      .set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

      //告知哪些类型需要序列化

      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)

    //hadoop配置

    val hadoopConf = new Configuration()

    hadoopConf.set("fs.defaultFS", hdfsRootPath)

    //获取输出路径

    val fileSystem = FileSystem.get(hadoopConf)

    //获取hbase配置

    val hconf = HBaseConfiguration.create()

    //设置zookeeper集群

    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)

    //设置端口

    hconf.set("hbase.zookeeper.property.clientPort""2181");

    //设置hfile最大个数

    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")

    //设置hfile的大小

    hconf.set("hbase.hregion.max.filesize","10737418240")

    hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    //获取hbase连接

    val hbaseConn = ConnectionFactory.createConnection(hconf)

    val admin = hbaseConn.getAdmin

    /**

     * 保存生成的HFile文件

     * 注:bulk load  生成的HFile文件需要落地

     * 然后再通过LoadIncrementalHFiles类load进Hbase

     * 此处关于  sortBy 操作详解:

     * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,

     * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,

     * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。

     * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序

     * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行

     * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)

     *

     * @param hfileRDD

     */

    // 0. 准备程序运行的环境

    // 如果 HBase 表不存在,就创建一个新表

    if (!admin.tableExists(TableName.valueOf(tableName))) {

      val desc = new HTableDescriptor(TableName.valueOf(tableName))

      val hcd = new HColumnDescriptor(familyName)

      desc.addFamily(hcd)

      admin.createTable(desc)

      print("创建了一个新表")

    }

    // 如果存放 HFile文件的路径已经存在,就删除掉

    if(fileSystem.exists(new Path(hFilePath))) {

      fileSystem.delete(new Path(hFilePath), true)

      print("删除hdfs上存在的路径")

    }

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:

    // java.io.IOException: Added a key not lexically larger than previous.

    val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)

      .map(row => {

        // 处理数据的逻辑

        val arrs = row.split("\t")

        var kvlist: Seq[KeyValue] = List()//存储多个列

        var rowkey: Array[Byte] = null

        var cn: Array[Byte] = null

        var v: Array[Byte] = null

        var kv: KeyValue = null

        val cf = familyName.getBytes //列族

        rowkey = Bytes.toBytes(arrs(0)) //key

        for (i <- 1 to (arrs.length - 1)) {

          cn = arr(i).getBytes() //列的名称

          v = Bytes.toBytes(arrs(i)) //列的值

          //将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

        }

        (new ImmutableBytesWritable(rowkey), kvlist)

      })

    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data

      .flatMapValues(_.iterator)

    // 2. Save Hfiles on HDFS

    val table = hbaseConn.getTable(TableName.valueOf(tableName))

    val job = Job.getInstance(hconf)

    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setMapOutputValueClass(classOf[KeyValue])

    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    hfileRDD

      .sortBy(x => (x._1, x._2.getKeyString), true//要保持 整体有序

      .saveAsNewAPIHadoopFile(hFilePath,

        classOf[ImmutableBytesWritable],

        classOf[KeyValue],

        classOf[HFileOutputFormat2],

        hconf)

    print("成功生成HFILE")

    val bulkLoader = new LoadIncrementalHFiles(hconf)

    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))

    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()

    sc.stop()

  }

}

 其中可能遇到的问题:

1

EndOfStreamException: Unable to read additional data from server sessionid 0x17f44ca01833e45, likely server has closed socket

 解决:

  主要是zk的版本不匹配,在依赖选择匹配的zk版本。

输出结果

使用spark进行hbase的bulkload,数据库,spark,hbase

https://www.cnblogs.com/huangguoming/articles/12967868.html文章来源地址https://www.toymoban.com/news/detail-604613.html

到了这里,关于使用spark进行hbase的bulkload的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)

    前言 题目: 一、读题分析 二、处理过程 1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串 2.这里提供除了SQL方法外的另一种过滤不满足条件的方法 三、重难点分析 总结  本题来源于全国职业技能大赛之大数据技术赛项 电商 赛题-离线数据处理-抽取

    2024年02月08日
    浏览(52)
  • 如何使用PyQt进行数据库操作?

    首先,我们要知道,PyQt是一个非常强大的图形用户界面(GUI)开发库,它允许我们使用Python语言创建美观且高度交互的桌面应用程序。然而,对于数据库操作,PyQt并不直接提供此类功能。这需要我们使用其他的数据库库,例如SQLite、MySQL或PostgreSQL等。 对于新手来说,我建议

    2024年02月11日
    浏览(65)
  • 【MySQL】使用DBeaver数据库管理工具进行MySQL数据库连接

    一、数据库连接信息填写 1、服务器地址:填写服务器部署的地址,以及端口号 2、数据库:sys 3、用户名:root 4、密码:服务器上面设置的具体密码 以上信息填写错误的报错提示 :Access denied for user ‘XXX’@’%’ to database ‘10.42.67.22’ 二、数据库说明 1、数据库连接时选择的

    2024年02月09日
    浏览(91)
  • 【大数据】分布式数据库HBase

    目录 1.概述 1.1.前言 1.2.数据模型 1.3.列式存储的优势 2.实现原理 2.1.region 2.2.LSM树 2.3.完整读写过程 2.4.master的作用 本文式作者大数据系列专栏中的一篇文章,按照专栏来阅读,循序渐进能更好的理解,专栏地址: https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482 当

    2024年04月27日
    浏览(48)
  • Hbase的bulkload流程与实践

    一、前言   通常 MapReduce 在写 HBase 时使用的是 HTableOutputFormat 方式,在 reduce 中直接生成 put 对象写入 HBase ,该方式在大数据量写入时效率低下(HBase 会 block 写入,频繁进行 flush、split、compact 等大量 IO 操作),并对 HBase 节点的稳定性造成一定的影响(GC 时间过长,响应变

    2024年02月10日
    浏览(38)
  • Python读取hbase数据库

    1. hbase连接 首先用hbase shell 命令来进入到hbase数据库,然后用list命令来查看hbase下所有表,以其中表“DB_level0”为例,可以看到库名“baotouyiqi”是拼接的,python代码访问时先连接: 备注:完整代码在最后,想运行的直接滑倒最后复制即可 2. 按条件读取hbase数据 然后按照条件

    2024年04月09日
    浏览(52)
  • 分布式数据库HBase

    HBase是一个高可靠、高性能、 面向列 、可伸缩的分布式数据库,是谷歌BigTable的开源实现,主要用来存储非结构化和把结构化的松散数据。 HBase的目标是处理非常庞大的表,可以通过水平扩展的方式,利用 廉价计算机集群 处理由超过10亿行数据和数百万列元素组成的数据表。

    2024年02月09日
    浏览(57)
  • 大数据NoSQL数据库HBase集群部署

    目录 1.  简介 2.  安装 1. HBase依赖Zookeeper、JDK、Hadoop(HDFS),请确保已经完成前面 2. 【node1执行】下载HBase安装包 3. 【node1执行】,修改配置文件,修改conf/hbase-env.sh文件 4. 【node1执行】,修改配置文件,修改conf/hbase-site.xml文件 5. 【node1执行】,修改配置文件,修改conf/regi

    2024年02月08日
    浏览(54)
  • 如何使用 PHP 进行数据库连接池优化?

    连接池是一个存放数据库连接的地方,就像一个水池,你在这里可以得到数据库连接。这比每次都新建和关闭连接要快得多,因为连接池中的连接是可以重复使用的。 下面是一个简单的例子,展示如何使用PHP和PDO(PHP Data Objects)来创建一个连接池。 这个类有一个连接池,其

    2024年02月15日
    浏览(88)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包