Hbase的bulkload流程与实践

这篇具有很好参考价值的文章主要介绍了Hbase的bulkload流程与实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前言

  通常 MapReduce 在写 HBase 时使用的是 HTableOutputFormat 方式,在 reduce 中直接生成 put 对象写入 HBase,该方式在大数据量写入时效率低下(HBase 会 block 写入,频繁进行 flush、split、compact 等大量 IO 操作),并对 HBase 节点的稳定性造成一定的影响(GC 时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应),而 HBase 支持 bulk load 的入库方式,它是利用 hbase 的数据信息按照特定格式存储在 hdfs 内这一原理,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合 Mapreduce 完成,高效便捷,而且不占用 region 资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对 HBase 节点的写入压力。

  通过使用先生成 HFile,然后再 BulkLoadHbase 的方式来替代之前直接调用 HTableOutputFormat 的方法有如下的好处:
  (1)消除了对 HBase 集群的插入压力
  (2)提高了 Job 的运行速度,降低了 Job 的执行时间

二、Bulkload 流程与实践

1. 案例一:

  bulkload 方式需要两个 Job 配合完成:
  (1)第一个 Job 还是运行原来业务处理逻辑,处理的结果不直接调用 HTableOutputFormat 写入到 HBase,而是先写入到 HDFS 上的一个中间目录下(如 middata)
  (2)第二个 Job 以第一个 Job 的输出(middata)做为输入,然后将其格式化 HBase 的底层存储文件 HFile
  (3)调用 BulkLoad 将第二个 Job 生成的 HFile 导入到对应的 HBase 表中
下面给出相应的范例代码:

import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class GeneratePutHFileAndBulkLoadToHBase {
 
	public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
	{
 
		private Text wordText=new Text();
		private IntWritable one=new IntWritable(1);
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String line=value.toString();
			String[] wordArray=line.split(" ");
			for(String word:wordArray)
			{
				wordText.set(word);
				context.write(wordText, one);
			}
			
		}
	}
	
	public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
 
		private IntWritable result=new IntWritable();
		protected void reduce(Text key, Iterable<IntWritable> valueList,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			int sum=0;
			for(IntWritable value:valueList)
			{
				sum+=value.get();
			}
			result.set(sum);
			context.write(key, result);
		}
		
	}
	
	public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
	{
 
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String wordCountStr=value.toString();
			String[] wordCountArray=wordCountStr.split("\t");
			String word=wordCountArray[0];
			int count=Integer.valueOf(wordCountArray[1]);
			
			//创建HBase中的RowKey
			byte[] rowKey=Bytes.toBytes(word);
			ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
			byte[] family=Bytes.toBytes("cf");
			byte[] qualifier=Bytes.toBytes("count");
			byte[] hbaseValue=Bytes.toBytes(count);
			// Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式
			// KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
			Put put=new Put(rowKey);
			put.add(family, qualifier, hbaseValue);
			context.write(rowKeyWritable, put);
			
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
        Configuration hadoopConfiguration=new Configuration();
        String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();
		
        //第一个Job就是普通MR,输出到指定的目录
        Job job=new Job(hadoopConfiguration, "wordCountJob");
        job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(dfsArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
        //提交第一个Job
        int wordCountJobResult=job.waitForCompletion(true)?0:1;
        
        //第二个Job以第一个Job的输出做为输入,只需要编写Mapper类,在Mapper类中对一个job的输出进行分析,并转换为HBase需要的KeyValue的方式。
        Job convertWordCountJobOutputToHFileJob=new Job(hadoopConfiguration, "wordCount_bulkload");
        
        convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
        convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);
		//ReducerClass 无需指定,框架会自行根据 MapOutputValueClass 来决定是使用 KeyValueSortReducer 还是 PutSortReducer
		//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);
        convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
        convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);
        
        //以第一个Job的输出做为第二个Job的输入
        FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[1]));
        FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[2]));
        //创建HBase的配置对象
        Configuration hbaseConfiguration=HBaseConfiguration.create();
        //创建目标表对象
        HTable wordCountTable =new HTable(hbaseConfiguration, "word_count");
        HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);
       
        //提交第二个job
        int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1;
        
        //当第二个job结束之后,调用BulkLoad方式来将MR结果批量入库
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);
        //第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表
        loader.doBulkLoad(new Path(dfsArgs[2]), wordCountTable);
        
        //最后调用System.exit进行退出
        System.exit(convertWordCountJobOutputToHFileJobResult);
		
	}
 
}

  比如原始的输入数据的目录为:/rawdata/test/wordcount/20131212
  中间结果数据保存的目录为:/middata/test/wordcount/20131212
  最终生成的 HFile 保存的目录为:/resultdata/test/wordcount/20131212
  运行上面的 Job 的方式如下:hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212

(1)说明与注意事项

  (1)HFile 方式在所有的加载方案里面是最快的,不过有个前提 —— 数据是第一次导入,表是空的。如果表中已经有了数据。HFile 再导入到 Hbase 的表中会触发 split 操作。

  (2)最终输出结果,无论是 map 还是 reduce,输出部分 key 和 value 的类型必须是: <ImmutableBytesWritable, KeyValue> 或者 <ImmutableBytesWritable, Put>。否则报这样的错误:

java.lang.IllegalArgumentException: Can't read partitions file
...
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

  (3)最终输出部分,Value 类型是 KeyValue 或 Put,对应的 Sorter 分别是 KeyValueSortReducerPutSortReducer,这个 SorterReducer 可以不指定,因为源码中已经做了判断。

if (KeyValue.class.equals(job.getMapOutputValueClass())) {
	job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
	job.setReducerClass(PutSortReducer.class);
} else {
	LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

  (4) MR 例子中 job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat 只适合一次对单列族组织成 HFile 文件,多列簇需要起多个 job,不过新版本的 Hbase(这句话是作者在四年前说的,现在最新的版本到达了什么程度我还没有去细究)已经解决了这个限制。

  (5) MR 例子中最后生成 HFile 存储在 HDFS 上,输出路径下的子目录是各个列族。如果对 HFile 进行入库 HBase,相当于移动 HFileHBaseRegion 中,HFile 子目录的列族内容没有了。

  (6)最后一个 Reduce 没有 setNumReduceTasks 是因为,该设置由框架根据 region 个数自动配置的。

  (7)下边配置部分,注释掉的其实写不写都无所谓,因为看源码就知道 configureIncrementalLoad 方法已经把固定的配置全配置完了,不固定的部分才需要手动配置。

public class HFileOutput {
        //job 配置
	public static Job configureJob(Configuration conf) throws IOException {
		Job job = new Job(configuration, "countUnite1");
		job.setJarByClass(HFileOutput.class);
                //job.setNumReduceTasks(2);  
		//job.setOutputKeyClass(ImmutableBytesWritable.class);
		//job.setOutputValueClass(KeyValue.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
 
		Scan scan = new Scan();
		scan.setCaching(10);
		scan.addFamily(INPUT_FAMILY);
		TableMapReduceUtil.initTableMapperJob(inputTable, scan,
				HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
		//这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class
                job.setReducerClass(HFileOutputRedcuer.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
		HFileOutputFormat.configureIncrementalLoad(job, new HTable(
				configuration, outputTable));
		HFileOutputFormat.setOutputPath(job, new Path());
                //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
		return job;
	}
 
	public static class HFileOutputMapper extends
			TableMapper<ImmutableBytesWritable, LongWritable> {
		public void map(ImmutableBytesWritable key, Result values,
				Context context) throws IOException, InterruptedException {
			//mapper逻辑部分
			context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
		}
	}
 
	public static class HFileOutputRedcuer extends
			Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
		public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
                        //reducer逻辑部分
			KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
					Bytes.toBytes(count));
			context.write(key, kv);
		}
	}
}

上述内容来自:HBase 写优化之 BulkLoad 实现数据快速入库

(2)自我实践
[hadoop@h71 ~]$ vi he.txt
hello world
hello hadoop
hello hive
[hadoop@h71 ~]$ hadoop fs -mkdir /rawdata
[hadoop@h71 ~]$ hadoop fs -put he.txt /rawdata
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac GeneratePutHFileAndBulkLoadToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar GeneratePutHFileAndBulkLoadToHBase*class
[hadoop@h71 hui]$ hadoop jar xx.jar GeneratePutHFileAndBulkLoadToHBase /rawdata /middata /resultdata

会报错:

Exception in thread "main" java.lang.IllegalArgumentException: No regions passed
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.writePartitions(HFileOutputFormat2.java:315)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configurePartitioner(HFileOutputFormat2.java:573)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureIncrementalLoad(HFileOutputFormat2.java:421)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureIncrementalLoad(HFileOutputFormat2.java:386)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.configureIncrementalLoad(HFileOutputFormat.java:90)
        at TestHFileToHBase.main(TestHFileToHBase.java:57)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

原来需要先建表然后再执行之前的命令:

hbase(main):031:0> create 'word_count','cf'
[hadoop@h71 ~]$ hadoop fs -lsr /middata
-rw-r--r--   2 hadoop supergroup          0 2017-03-20 10:36 /middata/_SUCCESS
-rw-r--r--   2 hadoop supergroup         32 2017-03-20 10:36 /middata/part-r-00000
[hadoop@h71 ~]$ hadoop fs -cat /middata/part-r-00000
hadoop  1
hello   3
hive    1
world   1
[hadoop@h71 ~]$ hadoop fs -lsr /resultdata
-rw-r--r--   2 hadoop supergroup          0 2017-03-20 10:36 /resultdata/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2017-03-20 10:36 /resultdata/cf
# 这里的 cf 是空目录,是因为 bulkload 会将指定目录下的 Hfile 格式的文件移动到 hbase 中,所以会是空目录,当用 mr 生成 HFile 文件是 cf 目录下会有 Hfile 格式的文件存在,并且无法用 hadoop fs -cat 查看,如果非要用的话会是乱码
hbase(main):012:0> scan 'word_count'
ROW                                         COLUMN+CELL                                                                                                                 
 hadoop                                     column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01                                                            
 hello                                      column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x03                                                            
 hive                                       column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01                                                            
 world                                      column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01
# 发现插入的数据是字节类型的,后将代码中的 put.add(family, qualifier, hbaseValue); 改为 put.add(family, qualifier, Bytes.toBytes("5"));)

# 再执行上述指令得到:
hbase(main):032:0> scan 'word_count'
ROW                                         COLUMN+CELL                                                                                                                 
 hadoop                                     column=cf:count, timestamp=1489977438537, value=5                                                                           
 hello                                      column=cf:count, timestamp=1489977438537, value=5                                                                           
 hive                                       column=cf:count, timestamp=1489977438537, value=5                                                                           
 world                                      column=cf:count, timestamp=1489977438537, value=5
 
# 后来又将int count=Integer.valueOf(wordCountArray[1]);修改为String count=wordCountArray[1];
# 再执行上述指令得到:
hbase(main):007:0> scan 'word_count'
ROW                                         COLUMN+CELL                                                                                                                 
 hadoop                                     column=cf:count, timestamp=1489748145527, value=1                                                                           
 hello                                      column=cf:count, timestamp=1489748145527, value=3                                                                           
 hive                                       column=cf:count, timestamp=1489748145527, value=1                                                                           
 world                                      column=cf:count, timestamp=1489748145527, value=1 

注:我不明白原作者为什么非要整成 int 类型,这样导入到 hbase 中就成 \x00\x00\x00\x01 了啊,后来搜索到一篇文章,可以看一下:【hbase】——bulk load导入数据时value=\x00\x00\x00\x01问题解析

  最终输出部分,Value 类型是 KeyValuePut,对应的 Sorter 分别是 KeyValueSortReducerPutSortReducer,这个 SorterReducer 可以不指定,因为源码中已经做了判断:

  于是我想将 Put 改为 KeyValue 输出为 HFile:于是修改 ConvertWordCountOutToHFileMapper 类的代码为:

	public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>
	{
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String wordCountStr=value.toString();
			String[] wordCountArray=wordCountStr.split("\t");
			String word=wordCountArray[0];
			String count=wordCountArray[1];
			
			//创建HBase中的RowKey
			byte[] rowKey=Bytes.toBytes(word);
			ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
			byte[] family=Bytes.toBytes("cf");
			byte[] qualifier=Bytes.toBytes("count");
			byte[] hbaseValue=Bytes.toBytes(count);
			// Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式
			 KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
//			Put put=new Put(rowKey);
//			put.add(family, qualifier, hbaseValue);
			context.write(rowKeyWritable, keyValue);
		}
	}

  执行上面命令后会报这个错:

Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.hbase.client.Put, received org.apache.hadoop.hbase.KeyValue
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at GeneratePutHFileAndBulkLoadToHBase$ConvertWordCountOutToHFileMapper.map(GeneratePutHFileAndBulkLoadToHBase.java:89)
        at GeneratePutHFileAndBulkLoadToHBase$ConvertWordCountOutToHFileMapper.map(GeneratePutHFileAndBulkLoadToHBase.java:67)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

  然后我将主方法中的 convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class); 改为 convertWordCountJobOutputToHFileJob.setMapOutputValueClass(KeyValue.class); 再执行这才好使了。原来这些后面跟的这些 class 都不是瞎写的啊,一开始我还以为是随便写的呐。。。

job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);    //代码中的主类
job.setMapperClass(WordCountMapper.class);  //第一个mr中的map类名
job.setReducerClass(WordCountReducer.class);   //第一个mr中的reduce类名
job.setOutputKeyClass(Text.class);  //我感觉这个是源码中的类名,并不是辖写的啊
job.setOutputValueClass(IntWritable.class);   //同上
convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);  //代码中的主类
convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);  //第二个mr中的map类名
convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);  //源码中的类名

  对于 Hbase的ImmutableBytesWritable 类型,如果直接 Sysout 输出的是一个类似于16进制的 byte[];

  假设我们获得了 ImmutableBytesWritable aa; 我们一般先将 aa 通过 byte[] bb = aa.get() 得到 byte[] 类型;然后通过 String cc = Bytes.toString(bb) 将其解析为 String;

2. 案例二:
(1)MR生成HFile文件
[hadoop@h71 hui]$ vi TestHFileToHBase.java
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestHFileToHBase {

        public static class TestHFileToHBaseMapper extends Mapper {

                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                        String[] values = value.toString().split(" ", 2);
                        byte[] row = Bytes.toBytes(values[0]);
                        ImmutableBytesWritable k = new ImmutableBytesWritable(row);
                        KeyValue kvProtocol = new KeyValue(row, "PROTOCOLID".getBytes(), "PROTOCOLID".getBytes(), values[1]
                                        .getBytes());
                        context.write(k, kvProtocol);
                }
        }

        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                Configuration conf = HBaseConfiguration.create();
                Job job = new Job(conf, "TestHFileToHBase");
                job.setJarByClass(TestHFileToHBase.class);

                job.setOutputKeyClass(ImmutableBytesWritable.class);
                job.setOutputValueClass(KeyValue.class);

                job.setMapperClass(TestHFileToHBaseMapper.class);
                job.setReducerClass(KeyValueSortReducer.class);
//                job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
                job.setOutputFormatClass(HFileOutputFormat.class);
                // job.setNumReduceTasks(4);
                // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);

//                 HBaseAdmin admin = new HBaseAdmin(conf);
                HTable table = new HTable(conf, "hua");

                 HFileOutputFormat.configureIncrementalLoad(job, table);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

}
[hadoop@h71 ~]$ vi he.txt
hello world
hello hadoop
hello hive
[hadoop@h71 ~]$ hadoop fs -mkdir /rawdata
[hadoop@h71 ~]$ hadoop fs -put he.txt /rawdata
hbase(main):020:0> create 'hua','PROTOCOLID'
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac TestHFileToHBase.java 
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar TestHFileToHBase*class
[hadoop@h71 hui]$ hadoop jar xx.jar TestHFileToHBase /rawdata /middata

报错:

Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, received org.apache.hadoop.io.LongWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

解决:

# 于是我把
public static class TestHFileToHBaseMapper extends Mapper {
# 修改为
public static class TestHFileToHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
# 就好使了。。。
[hadoop@h71 ~]$ hadoop fs -lsr /middata
drwxr-xr-x   - hadoop supergroup          0 2017-03-17 20:50 /middata/PROTOCOLID
-rw-r--r--   2 hadoop supergroup       1142 2017-03-17 20:50 /middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3
-rw-r--r--   2 hadoop supergroup          0 2017-03-17 20:50 /middata/_SUCCESS
(/middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3是个Hfile格式的文件,无法用hadoop fs -cat查看,否则会出现乱码)
(2)HFile入库到HBase

  原文代码有很多问题,修改后为:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestLoadIncrementalHFileToHBase {

        public static void main(String[] args) throws Exception {
                Configuration conf = HBaseConfiguration.create();
                String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                HTable table = new HTable(conf,"hua");
                LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                loader.doBulkLoad(new Path(dfsArgs[0]), table);
        }
}
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac TestLoadIncrementalHFileToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata/PROTOCOLID
执行后在hbase shell端查看表hua无数据。。因执行:
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata
hbase(main):073:0> scan 'hua'
ROW                                         COLUMN+CELL                                                                                                                 
 hello                                      column=PROTOCOLID:PROTOCOLID, timestamp=1489758507378, value=hive
(查看hua表h只有一条数据,一开始还很困惑,我的he.txt中有三条数据啊,为何只导入了一条数据啊,后来突然明白了hbase将he.txt中三行数据的hello作为rowkey,则三行数据的rowkey都一样了啊)

上述内容来自:生成HFile以及入库到HBase

3. 案例三:用 Scala 程序通过 Spark 完成

  为避免数据都写入一个 region,造成 Hbase 的数据倾斜问题。在当前 HMaster 活跃的节点上,创建预分区表:

create ‘userprofile_labels', { NAME => "f", BLOCKCACHE => "true", BLOOMFILTER => "ROWCOL", COMPRESSION => 'snappy', IN_MEMORY => 'true' }, { NUMREGIONS => 10, SPLITALGO => 'HexStringSplit' }

  将待同步的数据写入 HFile,HFile 中的数据以 key-value 键值对方式存储,然后将 HFile 数据使用 BulkLoad 批量写入 HBase 集群中。 Scala 脚本执行如下:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.Hbase.client.ConnectionFactory
import org.apache.hadoop.Hbase.{HbaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.Hbase.io.ImmutableBytesWritable
import org.apache.hadoop.Hbase.mapreduce.{HFileOutFormat2, LoadIncremectalHFiles}
import org.apache.hadoop.Hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.spark.sql.SparkSession

object Hive2HBase {
  def main(args: Array[String]): Unit = {
    // 传入日期参数 和 当前活跃的master节点
    val data_date = arg(0)
    val node = args(1)  //当前活跃的节点ip

    val spark = SparkSession
      .builder()
      .appName("Hive2Hbase")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .config("spark.storage.memoryFraction", "0.1")
      .config("spark.shuffle.memoryFraction", "0.7")
      .config("spark.memory.useLegacyMode", "ture")
      .enableHiveSupport()
      .getOrCreate()

    // 创建HBase的配置
    val conf = HBaseConfiguration.create()
        conf.set("HBase.zookeeper.quorum", "10.xxx.xxx.xxx, 10.xxx.xxx.xxx")
        conf.set("HBase.zookeeper.property.clientPort", "8020")

    //为了预防hfile文件数过多无法进行导入,设置参数值
    conf.setInt("HBase.hregion.max.filesize", 10737418240)
    conf.setInt("HBase.mapreduce.bulkload.max.hfiles.perRegion.perRegion.perFamily", 3200)

    val Data = spark.sql(s"select userid,userlabels from dw.userprofile_usergroup_labels_all where data_date='${data_date}‘”)
    val dataRdd = Data.rdd.flatMap(row => 
      val rowkey =  row.getAs[String]("userid".toLowerCase)
      val tagsmap = row.getAs[Map[String, Object]]("userlabels".toLowerCase)
      val sbkey = new StringBuffer()  // 对MAP结构转化 a->b 'a':'b'
      val sbvalue = new StringBuffer()
      for ((key, value) <- tagsmap) {
        sbkey.append(key + ":")
        val labelght = if (value == "") {
          "-999999"
        } else {
          value
        }
        sbvalue.append(labelght + ":")
      )
      val item = sbkey.substring(0, sbkey.length-1)
      val score = sbvalue.substring(0, sbvalue.length-1)
      Array(
        (rowkey,("f","i",item)),
        (rowkey,("f","s",score))
      )
    })
    
    // 将rdd转换成HFile需要的格式
    val rdds = dataRdd.fileter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1,x._2._2)).map(x => {
      //KeyValue的实例为value
      val rowKey = Bytes.toBytes(x._1)
      val family = Bytes.toBytes(x._2._1)
      val colum = Bytes.toBytes(x._2._2)
      val value = Bytes.toBytes(x._2._3.toString)
      (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
    ))

    // 文件保存在hdfs的位置
    val locatedir = "hdfs://" + node.toString + ":8020/user/bulkload/hfile/usergroup_HBase_" + data_date

    // 在locatedir生成的Hfile文件
    rdds.saveAsNewAPIHadoopFile(locatedir,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      conf)
    //HFile导入到HBase
    val load = new LoadIncrementalHFiles(conf)

    //HBase的表名
    val tableName = "userprofile_labels"
    //创建HBase的链接,利用默认的配置文件,读取HBase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根据表名获取表
    val table = conn.getTable(TableName.valueOf(tableName))

    try {
      //获取HBase表的region分布
      val regionLocation = conn.getregionLocation(TableName.valueOf(tableName))
      //创建一个hadoop的mapreduce的job
      val job = Job.getInstance(conf)
      //设置job名称,任意命名
      job.setJobName("Hive2HBase")
      //输出文件的内容KeyValue
      job.setMapOutputKeyClass(classOf[KeyValue])
      //设置文件输出key,outkey要用ImmutableBytesWritable
      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
      //配置HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocation)
      //开始导入
      load.doBulkLoad(new Path(locatedir), conn.getAdmin, table, regionLocation)
    } finally {
      table.close()
      conn.close()
    )
    spark.close()
  }
}

上述内容来自书籍:《用户画像方法论与工程化解决方案》

三、补充:Region分裂(Split)

参考:hbase的split策略和预分区 和 【原创】HBase的分裂(Split)与紧缩(Compaction)

  Region分裂是HBase最核心的功能之一,是实现分布式可扩展性的基础。最初,每个Table只有一个Region,随着数据的不断写入,HBase根据一定的触发条件和一定的分裂策略将Hbase的一个region分裂成两个子region并对父region进行清除,通过HBase的balance机制,实现分裂后的region负载均衡到对应RegionServer上。若一个table没有进行预分区,那么只有一个region,初始化表时数据的读写都命中同一个regionServer,会造成热点问题,且region进行split时集群是不可用的,频繁的split也会造成大量的集群I/O,性能很低。目前常见的HBase分裂方式有三种:

  • Per-Spliting(预分区)
  • Auto-Spliting(自动分裂)
  • Force-Spliting(强制分裂)

  Per-Spliting指的是在HBase创建Table时,指定好Table的Region的个数,生成多个Region。这么做的好处是一方面可以避免热点数据的写入问题(只有一个region,写数据量大时其余RegionServer就是空闲的),另一方面减少Region的Split几率,同时减少消耗集群的系统资源(IO,网络),减少因Split暂停Region的上线造成对HBase读写数据的影响。

  HBase默认建表时只有一个Region,此时的RowKey是没有边界的,即没有StartKey和EndKey。进行预分区时,需要配置切分点,使得HBase知道在哪个RowKey点做切分。hbase提供了两种pre-split算法:HexStringSplit和UniformSplit,前者适用于十六进制字符的rowkey,后者适用于随机字节数组的rowkey。

//创建一个名为hex_test的表,有两个列簇info和desc,可存3个版本的数据,副本为2,预先指定10个region,且split算法为HexStringSplit
create 'hex_test',{NAME=>'info',VERSIONS=>3},{NAME=>'desc',VERSIONS=>3},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit',REGION_REPLICATION=>2}

//UniformSplit
create 'uniform_test',{NAME=>'info',VERSIONS=>3},{NAME=>'desc',VERSIONS=>3},{NUMREGIONS=>10,SPLITALGO=>'UniformSplit',REGION_REPLICATION=>2}

  自动分裂指的是随着不断向表中写入数据,Region也会不断增大,HBase根据触发的分裂策略自动分裂Region,当前HBase已经有6中分裂触发的策略,不同版本中配置的分裂策略不同。文章来源地址https://www.toymoban.com/news/detail-496727.html

// 强制分裂
split 'forced_table', 'b' //其中forced_table 为要split的table , ‘b’ 为split 点

// 我们还可以通过配置hbase.regionserver.region.split.policy指定自己的split策略

<!--定时切分执行类-->
<property>
    <name>hbase.regionserver.region.split.policy</name>
    <value>org.apache.hadoop.hbase.regionserver.TimingRegionSplitPolicy</value>
</property>
<!--定时切分时间-->
<property>
     <name>hbase.regionserver.region.split.startTime</name>
     <value>02:00:00</value>
</property>
<property>
    <name>hbase.regionserver.region.split.endTime</name>
    <value>04:00:00</value>
</property>

到了这里,关于Hbase的bulkload流程与实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《Git入门实践教程》前言+目录

    版本控制系统(VCS)在项目开发中异常重要,但和在校大学生的交流中知道,这个重要方向并未受到重视。具备这一技能,既是项目开发能力的体现,也可为各种面试加码。在学习体验后知道,Git多样化平台、多种操作方式、丰富的资源为业内人士提供了方便的同时,也造成

    2024年02月10日
    浏览(50)
  • Hadoop/HDFS/MapReduce/Spark/HBase重要知识点整理

    本复习提纲主要参考北京大学计算机学院研究生课程《网络大数据管理与应用》课程资料以及厦门大学计算机科学系研究生课程 《大数据技术基础》相关材料整理而成,供广大网友学习参考,如有版权问题请联系作者删除:guanmeige001@pku.edu.cn Hadoop简介 Hadoop的功能和作用: 高

    2024年02月02日
    浏览(46)
  • FPGA学习实践之旅——前言及目录

    很早就有在博客中记录技术细节,分享一些自己体会的想法,拖着拖着也就到了现在。毕业至今已经半年有余,随着项目越来越深入,感觉可以慢慢进行总结工作了。趁着2024伊始,就先开个头吧,这篇博客暂时作为汇总篇,记录在这几个月以及之后从FPGA初学者到也算有一定

    2024年02月03日
    浏览(35)
  • 大数据处理技术作业——使用HBase&MongoDB&MapReduce进行数据存储和管理

    写这篇文章的目的,主要是为了记录一下这次作业历程,并且笔者了解到很多同志饱受作业折磨,遂简单分享一下个人完成作业的历程,以下内容仅为本人的一些乱七八糟的想法, 仅作参考O(∩_∩)O 1、本作业的链接 【完成本次作业用到的代码文件,列出网盘链接,https://p

    2024年02月07日
    浏览(32)
  • 基于Hadoop的MapReduce网站日志大数据分析(含预处理MapReduce程序、hdfs、flume、sqoop、hive、mysql、hbase组件、echarts)

    需要本项目的可以私信博主!!! 本项目包含:PPT,可视化代码,项目源码,配套Hadoop环境(解压可视化),shell脚本,MapReduce代码,文档以及相关说明教程,大数据集! 本文介绍了一种基于Hadoop的网站日志大数据分析方法。本项目首先将网站日志上传到HDFS分布式文件系统

    2024年02月16日
    浏览(45)
  • 大数据期资料2023 Beta版 - Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase详解

    了解大数据概念、Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase等技术,包括特点、命令操作和启动关闭方法。获取2023年大数据资料Beta版。

    2024年02月06日
    浏览(129)
  • MapReduce概述及工作流程

    mapreduce原语(独创) mapreduce工作流程(重点) MR作业提交流程(重点) YARN RM-HA搭建(熟练) 运行自带的wordcount(了解) 动手写wordcount(熟练) MapReduce原语 hadoop MapReduce框架可以让你的应用在集群中 可靠地 容错地 并行 处理TB级别的数据 1024TB=1PB  1024PB=1EB  1024EB=1ZB MapReduc

    2023年04月08日
    浏览(37)
  • MapReduce初级编程实践

    ubuntu18.04虚拟机和一个win10物理主机 编程环境 IDEA 虚拟机ip:192.168.1.108 JDK:1.8 使用Java编程一个WordCount程序,并将该程序打包成Jar包在虚拟机内执行 首先使用IDEA创建一个Maven项目 在pom.xml文件内引入依赖和打包为Jar包的插件: 编写对应的程序: MyProgramDriver类用于执行程序入口

    2023年04月26日
    浏览(29)
  • MapReduce 原理与实践

    Hadoop MapReduce 是一个 编程框架 ,它可以轻松地编写应用程序,以可靠的、容错的方式处理大量的数据(数千个节点)。 正如其名,MapReduce 的工作模式主要分为 Map 阶段和 Reduce 阶段 。 一个 MapReduce 任务(Job)通常将输入的数据集分割成独立的块,这些块被 map 任务以完全并行的

    2024年02月06日
    浏览(33)
  • 实验5:MapReduce 初级编程实践

    由于CSDN上传md文件总是会使图片失效 完整的实验文档地址如下: https://download.csdn.net/download/qq_36428822/85709497 实验内容与完成情况: (一)编程实现文件合并和去重操作 对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并, 并剔除其中重复的内

    2024年02月07日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包