java实现hbase数据导出

这篇具有很好参考价值的文章主要介绍了java实现hbase数据导出。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. HBase-client方式实现

1.1 依赖

 <!--HBase依赖坐标-->
 <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.6</version>
        <exclusions><!--排除依赖:不加入这句会报错-->
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

1.2 配置及代码

1.2.1 get方式
public class HBaseService {
    private static final Logger logger = LoggerFactory.getLogger(HBaseService.class);

    /**
     * 配置文件读取的配置信息
     */
    static Configuration configuration = HBaseConfiguration.create();

    /**
     * 链接信息
     */
    private static Connection conn = null;

    static {
        try {
            conn = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 进行数据的查询以及写入到文件中(通过get方式查询获得数据并写入文件)
     * @param rowKey rowKey信息
     * @param tableName 表名
     * @param dirName 文件目录
     * @param fileExist 文件是否存在的标志
     */
    public static void addInfoToFile(String rowKey, String tableName, String dirName, boolean fileExist){
        Table table = null;
        ResultScanner result = null;
        try {
            Connection connection = ConnectionFactory.createConnection(configuration);
            table = connection.getTable(TableName.valueOf(tableName));
            List<Get> gets = new ArrayList<>();
            Get get = new Get(Bytes.toBytes(rowKey));
            gets.add(get);
            // result的集合
            Result[] resultArr = table.get(gets);
            Map<String, Map<String,String>> dataMap = new HashMap<>();
            for (Result r : resultArr) {
                String rowKey1 = Bytes.toString(r.getRow());
                Map<String, String> columnDataMap;
                if (dataMap.containsKey(rowKey1)){
                    columnDataMap = dataMap.get(rowKey1);
                }else {
                    columnDataMap = new HashMap<>();
                }
                for (Cell kv : r.rawCells()) {
                    String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
                    String value = Base64Encoder.encode(CellUtil.cloneValue(kv));
                    columnDataMap.put(qualifire, value);
                    dataMap.put(rowKey1, columnDataMap);
                }
            }

             if (MapUtil.isNotEmpty(dataMap)){
                 for (String r : dataMap.keySet()) {
                     Map<String, String> columnMap = dataMap.get(r);
                     StrBuilder lineStr = new StrBuilder();
                     lineStr.append(r + "||");
                     for (String s : columnMap.keySet()) {
                         lineStr.append(s + ":" + columnMap.get(s) + "\t");
                     }
                     String fileName = dirName + File.separator + "data.txt";
                     File f = new File(fileName);
                     if (!f.exists()){
                         try {
                             f.createNewFile();
                         }catch (IOException e){
                             logger.error("创建文件失败,异常信息:{}", e.getMessage());
                         }
                     }
                     BufferedWriter writer = new BufferedWriter(
                             new FileWriter(fileName, true));

                     writer.write(lineStr.toString()  + "\n");
                     logger.info("写入rowkey:{}的波形数据到:{}", r, fileName);
                     writer.close();
                 }
            }
        }catch (Exception e){
            logger.error("写入rowkey:{}的波形数据到:{}失败,错误的信息:{}", rowKey, dirName, e.getMessage());
        }
    }
}
1.3.1 Scan方式
   /**
     * 通过scan的方式进行数据获取
     * @param rowKey rowkey
     * @param startKey 开始的rowKey
     * @param stopKey 结束的rowKey
     * @param regexStr rowKey的正则匹配表达式
     */
    public static void findRowKey(String rowKey, String startKey, String stopKey, String regexStr){
        Table table = null;
        ResultScanner result = null;
        try {
            TableName[] tbs = conn.getAdmin().listTableNames();
            FilterList filters = new FilterList();
            table = conn.getTable(TableName.valueOf("Vibration_WaveData"));
            Scan scan = new Scan();
            // 通过正则匹配的方式+rowkey进行数据过滤
            RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, regexComparator);
            // 设置start和stop Rowkey 可以提供检索效率
            scan.setStartRow(startKey.getBytes());
            scan.setStopRow(stopKey.getBytes());
            scan.setFilter(rowFilter);
            // 每次从服务器端获取的行数
            scan.setCaching(100000);
            ResultScanner result1 = table.getScanner(scan);
            for (Result r : result1) {
                for (KeyValue kv : r.raw()) {
                    System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.",
                            Bytes.toString(kv.getRow()),
                            Bytes.toString(kv.getFamily()),
                            Bytes.toString(kv.getQualifier()),
                            Bytes.toString(kv.getValue()),
                            kv.getTimestamp()));
                }
            }
            result1.close();
            conn.close();
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
    }

2. mapReduce实现

2.1 依赖

 <!--hadoop依赖坐标-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.6</version>
        </dependency>

2.2 配置文件

hbase-site.xml:文章来源地址https://www.toymoban.com/news/detail-734761.html

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <!-- 指定 hbase 是分布式的 -->
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <!-- 指定 zk 的地址,多个用“,”分割 -->
        <name>hbase.zookeeper.quorum</name>
        <value>192.168.1.100:2181,192.168.1.102:2181</value>
    </property>

    <!-- 开启 uber 模式,默认关闭 -->
    <property>
        <name>mapreduce.job.ubertask.enable</name>
        <value>true</value>
    </property>
    <!-- uber 模式中最大的 mapTask 数量,可向下修改 -->
    <property>
        <name>mapreduce.job.ubertask.maxmaps</name>
        <value>9</value>
    </property>
    <!-- uber 模式中最大的 reduce 数量,可向下修改 -->
    <property>
        <name>mapreduce.job.ubertask.maxreduces</name>
        <value>1</value>
    </property>
    <!-- uber 模式中最大的输入数据量,默认使用 dfs.blocksize 的值,可向下修改 -->
    <property>
        <name>mapreduce.job.ubertask.maxbytes</name>
        <value></value>
    </property>
</configuration>

2.3 导出的代码

public class ReadHbaseDataByMRToHDFS {
static Configuration configuration = HBaseConfiguration.create();
    /**
     * 进行hbase数据导出的操作
     * @param tableName 表名
     * @param dirName   文件夹名称
     * @param startRow  开始的row key
     * @param stopRow   结束的row key
     * @param regexStr  进行匹配的字符
     */
    public void exportHbaseData(String tableName, String dirName, String startRow, String stopRow, String regexStr) {
    
        logger.info("开始进行HBase数据导出,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{}", tableName, dirName, startRow, stopRow, regexStr);
        System.setProperty("HADOOP_USER_NAME", "root");
        // 一次rpc请求的超时时间,如果某次RPC请求超过该值,客户端就会主动管理Socket
        configuration.set("hbase.rpc.timeout", "600000");
        // ,该参数是表示HBase客户端发起一次scan操作的rpc调用至得到响应之间总的超时时间
        configuration.set("hbase.client.scanner.timeout.period", "600000");
        configuration.set("mapreduce.job.ubertask.maxmaps", "10");
        configuration.set("mapreduce.job.ubertask.maxreduces", "1");
        configuration.set("mapreduce.task.io.sort.mb", "1024");
        configuration.set("mapred.map.tasks", "10");
        try {
            Job job = Job.getInstance(configuration);
            job.setJarByClass(ReadHbaseDataByMRToHDFS.class);
            //设置reduce个数
            job.setNumReduceTasks(0);
            //设置map
            Scan scan = new Scan();
// 设置start和stop rowkey以及regex提高检索效率
            RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
            scan.setStartRow(startRow.getBytes()).setStopRow(stopRow.getBytes());
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, regexComparator);
            scan.setFilter(rowFilter);
            // 每次从服务器端获取的行数
            scan.setCaching(900000);
            //参数false,关于添加依赖jar
            TableMapReduceUtil.initTableMapperJob(tableName,
                    scan,
                    ReadHBaseDataByMRToHDFSMapper.class,
                    Text.class,
                    NullWritable.class,
                    job,
                    false);

            //输出目录
            FileOutputFormat.setOutputPath(job, new Path(dirName));
            //提交
            boolean isDone = job.waitForCompletion(true);
            if (isDone){
                Thread.sleep(3000);
                logger.info("进行HBase数据导出成功,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{},状态:{}", tableName, dirName, startRow, stopRow, regexStr, isDone);
            }
          } catch (Exception e) {
            logger.error("进行HBase数据导出时出现异常,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{},异常信息:{}",
                    tableName, dirName, startRow, stopRow, regexStr, e.getMessage());
        }
    }

    /**
     * 参数
     * ImmutableBytesWritable
     * Result :HBase中的数据每次取出来是一个Result:就是一个rowkey做一个result
     * <p>
     * keyOut:
     * valueOut:
     */
    static class ReadHBaseDataByMRToHDFSMapper extends TableMapper<Text, NullWritable> {
        Text outKey = new Text();
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            List<Cell> cells = value.listCells();
            Map<String, Map<String, String>> cellMap = new HashMap<>();
            //一个cell一条数据 包含一个column
            for (Cell cell : cells) {
                String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                Map<String, String> columnMap = new HashMap<>();
                if (cellMap.containsKey(rowkey)){
                    columnMap = cellMap.get(rowkey);
                }
                // String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                String columnValue = Base64Encoder.encode(CellUtil.cloneValue(cell));
                columnMap.put(column, columnValue);
                cellMap.put(rowkey, columnMap);
                // long timeStamp = cell.getTimestamp();
               // outKey.set(rowkey + "\t\t" + column + "\t\t" + columnValue + "\n");
            }
            if (CollUtil.isNotEmpty(cellMap)){
                String lineStr = "";
                for (String s : cellMap.keySet()) {
                    Map<String, String> columnMap = cellMap.get(s);
                    lineStr = s + "||";
                    for (String c : columnMap.keySet()) {
                        lineStr += c + ":" + columnMap.get(c) + "\t";
                    }
                }
                outKey.set(lineStr);
                context.write(outKey, NullWritable.get());
                outKey.clear();
            }
        }
    }
}

到了这里,关于java实现hbase数据导出的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java实现excel的导入导出(带参数校验:非空校验、数据格式校验)

    本次封装引入阿里开源框架EasyExcel,EasyExcel是一个基于Java的简单、省内存的读写Excel的开源项目。在尽可能节约内存的情况下支持读写百M的Excel。 github地址:GitHub - alibaba/easyexcel: 快速、简洁、解决大文件内存溢出的java处理Excel工具 。 64M内存20秒读取75M(46W行25列)的Excel(3.0.2

    2024年02月01日
    浏览(69)
  • HBase导出建表语句

    HBase导出建表语句 HBase是一个面向大数据的分布式列存数据库,它以Hadoop作为底层存储和计算平台。在HBase中,数据以表的形式存储,每个表由行和列组成。本文将介绍如何使用HBase导出建表语句,并提供相应的代码示例。 在HBase中,建表语句用于创建表和指定表的列族。表由

    2024年02月21日
    浏览(31)
  • java: 从HBase中读取数据

    一、添加依赖: 二、使用Scanner读取数据示例:

    2024年01月24日
    浏览(39)
  • Hbase数据库完全分布式搭建以及java中操作Hbase

    基础的环境准备不在赘述,包括jdk安装,防火墙关闭,网络配置,环境变量的配置,各个节点之间进行免密等操作等。使用的版本2.0.5. 参考官方文档 分布式的部署,都是在单节点服务的基础配置好配置,直接分发到其他节点即可。 jdk路径的配置,以及不适用内部自带的zk. 配

    2024年02月03日
    浏览(49)
  • 大数据HBase学习圣经:一本书实现HBase学习自由

    本文是《大数据HBase学习圣经》 V1版本,是 《尼恩 大数据 面试宝典》姊妹篇。 这里特别说明一下:《尼恩 大数据 面试宝典》5个专题 PDF 自首次发布以来, 已经汇集了 好几百题,大量的大厂面试 干货、正货 。 《尼恩 大数据 面试宝典》面试题集合, 将变成大数据学习和面

    2024年02月10日
    浏览(38)
  • <Java导出Excel> 1.0 Java实现Excel动态模板导出

    思路: 1,先创建动态模板(必须要在数据库建一张表,可随时修改模板) 例如: 建表语句: 模板中的字段脚本: 2,编写一个查询接口:返回一个List map 注意:order by id 根据表中字段:id排序的作用是控制导出的EXCEL表中字段列的顺序; mapper.xml层: mapper接口层: serviceIm

    2024年02月12日
    浏览(49)
  • HBase Java API 开发:批量操作 第2关:批量删除数据

    删除单行数据 删除一行数据很简单,我们来看个示例: 这段代码就可以删除行键为 row1 的行。 删除多行数据 如何删除多行数据呢? 相信你已经猜到了,既然 get() 方法有重载方法,那应该 delete() 方法也有,确实: 这样就可以删除多行数据啦。 编程要求 还等啥,亲自试一试

    2024年02月05日
    浏览(56)
  • Java导出Excel模板,导出数据到指定模板,通过模板导入数据(一)

    本文章主要是介绍阿里巴巴的easyexcel的使用 1. 首先需要我们导入easyexcel的依赖包 2. 前期工作准备 编写相关导出模板和导入模板。在项目的resources下创建文件夹,命名为excel 导出模板(此处仅做示例,字段根据自己项目来):  导入模板(导入时需要哪些字段根据自己项目业

    2024年02月03日
    浏览(46)
  • 如何使用Java 实现excel模板导出---多sheet导出?

    效果展示: maven依赖 相关工具类 **此处省略异常处理类 ExcelReportUtil 类 excel 接口 实现类 excel填充数据处理类 excel填充处理类 excel模板处理 实现关键代码展示 通过模板实现导出功能 ExcelReportCreator 中的代码 导入数据案例展示 excel模板批注案例 每个sheet页都需要写批注,通过批

    2024年02月15日
    浏览(44)
  • Java实现PDF导出

    需求:使用easyPOI方式导出合同word文档 Word模板和Excel模板用法基本一致,支持的标签也是一致的,仅仅支持07版本的word也是只能生成后缀是docx的文档,poi对doc支持不好所以easyPOI中就没有支持doc,我们就拿docx做导出 这里得好好说说模板中标签的用法: 下面列举下EasyPoi支持的

    2024年02月03日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包