SpringBoot使用Hbase

这篇具有很好参考价值的文章主要介绍了SpringBoot使用Hbase。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot使用Hbase

一,引入依赖

		<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.3.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

二,配置文件添加自己的属性

hbase:
  zookeeper:
    quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155
    property:
      clientPort: 2181
  master:
    port: 9001

三,配置类注入HBASE配置

package com.hbase.config;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName: HBaseConfig
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/12 18:06
 * @version: 1.0
 */
@Configuration
@RefreshScope
public class HBaseConfig {

    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;

    @Value("${hbase.zookeeper.property.clientPort}")
    private String clientPort;

    @Value("${hbase.master.port}")
    private String masterPort;

    @Bean
    public org.apache.hadoop.conf.Configuration hbaseConfiguration() {
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);
        // 如果hbase是集群,这个必须加上
        // 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的
        conf.set("hbase.master", zookeeperQuorum + ":" + masterPort);

        conf.set("hbase.client.keyvalue.maxsize", "20971520");
        conf = HBaseConfiguration.create(conf);
        return conf;
    }

}

四,配置Hbase连接池

这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长

package com.hbase.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Vector;

/**
 * @ClassName: HbaseConnectionPool
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/13 9:45
 * @version: 1.0
 */
@Component
@Slf4j
public class HbaseConnectionPool {

    /**
     * 连接池最大的大小
     */
    private int nMaxConnections = 20;

    /**
     * 连接池自动增加的大小
     */
    private int nIncrConnectionAmount = 3;

    /**
     * 连接池的初始大小
     */
    private int nInitConnectionAmount = 3;

    /**
     * 存放连接池中数据库连接的向量,初始时为null
     */
    private Vector vcConnections = null;

    @Resource
    private Configuration hbaseConfiguration;

    @PostConstruct
    public void init() {
        try {
            vcConnections = new Vector();
            createConnections(nInitConnectionAmount);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public synchronized Connection getConnection() {
        // 确保连接池己被创建
        if (vcConnections == null) {
            // 连接池还没创建,则返回null
            return null;
        }
        // 获得一个可用的数据库连接
        Connection conn = getFreeConnection();
        // 如果目前没有可以使用的连接,即所有的连接都在使用中
        while (conn == null) {
            // 等一会再试
            try {
                wait(250);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接
            conn = getFreeConnection();
        }

        // 返回获得的可用的连接
        return conn;

    }

    /**
     * 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果
     * 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置
     * 的值创建几个数据库连接,并放入连接池中。
     * 如果创建后,所有的连接仍都在使用中,则返回 null
     * @return
     * 		返回一个可用的数据库连接
     */
    private Connection getFreeConnection() {
        // 从连接池中获得一个可用的数据库连接
        Connection conn = findFreeConnection();
        if (conn == null) {
            // 如果目前连接池中没有可用的连接
            // 创建一些连接
            try {
                createConnections(nIncrConnectionAmount);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.error("create new connection fail.", e);
            }
            // 重新从池中查找是否有可用连接
            conn = findFreeConnection();
            if (conn == null) {
                // 如果创建连接后仍获得不到可用的连接,则返回 null
                return null;
            }
        }

        return conn;
    }

    /**
     * 创建由 numConnections 指定数目的数据库连接 , 并把这些连接
     * 放入 connections 向量中
     * @param _nNumConnections 要创建的数据库连接的数目
     * @throws Exception
     */
    private void createConnections(int _nNumConnections) throws Exception {
        // 循环创建指定数目的数据库连接
        for (int x = 0; x < _nNumConnections; x++) {
            // 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections
            // 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。
            // 如果连接数己经达到最大,即退出。
            if (this.nMaxConnections > 0  && this.vcConnections.size() >= this.nMaxConnections) {
                log.warn("已达到最大连接数,不能再增加连接");
                throw new Exception("已达到最大连接数"+ nMaxConnections+",不能再增加连接");
            }

            // 增加一个连接到连接池中(向量 connections 中)
            vcConnections.addElement(new ConnectionWrapper(newConnection()));

            log.info("HBase数据库连接己创建 ...... " + x);
        }
    }

    /**
     * 查找池中所有的連接,查找一个可用的數據庫連接,
     * 如果没有可用的連結,返回null
     * @return
     * 		返回一個可用的數據庫連接
     */
    private Connection findFreeConnection() {
        Connection conn = null;
        ConnectionWrapper connWrapper = null;
        //獲得連接池向量中所有的對象
        Enumeration enumerate = vcConnections.elements();
        //遍歷所有的对象,看是否有可用的連接
        while (enumerate.hasMoreElements()) {
            connWrapper = (ConnectionWrapper) enumerate.nextElement();
            if (!connWrapper.isBusy()) {
                //如果此對象不忙,則獲得它的數據庫連接并把它設為忙
                conn = connWrapper.getConnection();
                connWrapper.setBusy(true);
                // 己经找到一个可用的連接,退出
                break;
            }
        }

        // 返回找到的可用連接
        return conn;
    }

    /**
     *创建一个新的数据库连接并返回它
     * @return
     * 		返回一个新创建的数据库连接
     */
    private Connection newConnection() {
        /** hbase 连接 */
        Connection conn = null;
        // 创建一个数据库连接
        try {
            conn = ConnectionFactory.createConnection(hbaseConfiguration);
        } catch (IOException e) {
            log.error("创建HBase数据库连接失败!");
            e.printStackTrace();
        }

        // 返回创建的新的数据库连接
        return conn;
    }

    public synchronized void releaseConnection(Connection conn) {
        if (this.vcConnections == null) {
            log.info("连接池不存在,无法返回此连接到连接池中!!");
        } else {
            ConnectionWrapper connWrapper = null;
            Enumeration enumerate = this.vcConnections.elements();

            while(enumerate.hasMoreElements()) {
                connWrapper = (ConnectionWrapper) enumerate.nextElement();
                if (conn == connWrapper.getConnection()) {
                    connWrapper.setBusy(false);
                    break;
                }
            }

        }
    }

    class ConnectionWrapper {
        
        /**
         * 数据库连接
         */
        private Connection connection = null;
        
        /**
         * 此连接是否正在使用的标志,默认没有正在使用
         */
        private boolean busy = false;

        /**
         * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
         */
        public ConnectionWrapper(Connection connection) {
            this.connection = connection;
        }

        /**
         * 返回此对象中的连接
         */
        public Connection getConnection() {
            return connection;
        }

        /**
         * 设置此对象的连接
         */
        public void setConnection(Connection connection) {
            this.connection = connection;
        }

        /**
         * 获得对象连接是否忙
         */
        public boolean isBusy() {
            return busy;
        }

        /**
         * 设置对象的连接正在忙
         */
        public void setBusy(boolean busy) {
            this.busy = busy;
        }
    }

}

init()方法实现在初始化连接池的时候创建默认数值的连接。

五,配置操作服务类

操作类接口 HbaseService.java

package com.hbase.service;

import org.apache.hadoop.hbase.client.Scan;

import java.util.Map;

/**
 * @InterfaceName: HbaseService
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/12 18:11
 * @version: 1.0
 */
public interface HbaseService {

    Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey);

    Map<String,String> getRowData(String tableName, String rowKey);

    Map<String,String> getFamilyValue(String tableName, String rowKey, String familyName);

    String getColumnValue(String tableName, String rowKey, String familyName, String columnName);

    Map<String,Map<String,String>> queryData(String tableName, Scan scan);

}

接口实现类 HbaseServiceImpl.java

package com.hbase.service.impl;

import com.hbase.config.HbaseConnectionPool;
import com.hbase.service.HbaseService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.*;

/**
 * @ClassName: HbaseServiceImpl
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/12 18:13
 * @version: 1.0
 */
@Slf4j
@Service
public class HbaseServiceImpl implements HbaseService {

    @Resource
    private HbaseConnectionPool pool;

    @Override
    public Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){
        Scan scan = new Scan();

        if(StringUtils.isNotBlank(startRowKey) && StringUtils.isNotBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }

        return this.queryData(tableName,scan);
    }

    public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){
        Scan scan = new Scan();

        if(StringUtils.isNotBlank(prefix)){
            Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }


    @Override
    public Map<String,Map<String,String>> queryData(String tableName, Scan scan){
        
        Map<String,Map<String,String>> result = new HashMap<>();

        ResultScanner rs = null;
        // 获取表
        Table table= null;
        Connection connection = null;
        try {
            connection = pool.getConnection();
            table = getTable(connection, tableName);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                //每一行数据
                Map<String,String> columnMap = new HashMap<>();
                String rowKey = null;
                for (Cell cell : r.listCells()) {
                    if(rowKey == null){
                        rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
                    }
                    columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }

                if(rowKey != null){
                    result.put(rowKey,columnMap);
                }
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}"
                    ,tableName),e);
        }finally {
            close(null, rs, table, connection);
        }

        return result;
    }

    @Override
    public Map<String,String> getRowData(String tableName, String rowKey){
        //返回的键值对
        Map<String,String> result = new HashMap<>();

        Get get = new Get(Bytes.toBytes(rowKey));
        // 获取表
        Table table= null;
        Connection connection = null;
        try {
            connection = pool.getConnection();
            table = getTable(connection, tableName);
            Result hTableResult = table.get(get);
            if (hTableResult != null && !hTableResult.isEmpty()) {
                for (Cell cell : hTableResult.listCells()) {
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                // 某些应用场景需要插入到数据库的时间
                if (hTableResult.listCells().size() > 0) {
                    result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp() + "");
                }
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}"
                    ,tableName,rowKey),e);
        }finally {
            close(null,null, table, connection);
        }

        return result;
    }

    @Override
    public Map<String,String> getFamilyValue(String tableName, String rowKey, String familyName){
        //返回的键值对
        Map<String,String> result = new HashMap<>(2);

        Get get = new Get(Bytes.toBytes(rowKey));
        get.addFamily(Bytes.toBytes(familyName));
        // 获取表
        Table table= null;
        Connection connection = null;
        try {
            connection = pool.getConnection();
            table = getTable(connection, tableName);
            Result getResult = table.get(get);
            if (getResult != null && !getResult.isEmpty()) {
                for (Cell cell : getResult.listCells()) {
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}"
                    , tableName, rowKey, familyName), e);
        }finally {
            close(null,null, table, connection);
        }

        return result;
    }

    @Override
    public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){
        String str = null;
        Get get = new Get(Bytes.toBytes(rowKey));
        // 获取表
        Table table= null;
        Connection connection = null;
        try {
            connection = pool.getConnection();
            table = getTable(connection, tableName);
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
                if(cell != null){
                    str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
        }finally {
            close(null,null, table, connection);
        }

        return str;
    }

    private Table getTable(Connection connection, String tableName) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        return table;
    }

    private void close(Admin admin, ResultScanner rs, Table table, Connection connection){
        if(admin != null){
            try {
                admin.close();
            } catch (IOException e) {
                log.error("关闭Admin失败",e);
            }
        }

        if(rs != null){
            rs.close();
        }

        if(table != null){
            try {
                table.close();
            } catch (IOException e) {
                log.error("关闭Table失败",e);
            }
        }

        // 释放连接
        if (Objects.nonNull(connection)) {
            pool.releaseConnection(connection);
        }
    }

}

ok,现在就可以操作使用了。

以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)

这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。文章来源地址https://www.toymoban.com/news/detail-579676.html

到了这里,关于SpringBoot使用Hbase的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring面试】一、SpringBoot启动优化与最大连接数

    调试: 写一个测试接口: 服务配置中的相关参数: 此时,JMeter模拟100QPS: 成功40个,刚好是 (max-connections)+(accept-count) ,而这两个参数的默认值可以在Spring-boot-autoconfigure.jar的配置元数据的json文件 spring-configuration-metadata.json 中找到:(当然也可以直接在application.yaml中按

    2024年02月09日
    浏览(38)
  • Hbase-技术文档-spring-boot整合使用hbase--简单操作增删改查--提供封装高可用的模版类

    使用spring-boot项目来整合使用hbase。 依赖声明表示将把Apache HBase客户端库的2.4.3版本添加到项目中。HBase是一个分布式、可扩展的大数据存储系统,它基于Google的Bigtable模型,并使用了Hadoop分布式文件系统作为底层存储。HBase客户端库是用于与HBase数据库进行交互的工具库,提供

    2024年02月07日
    浏览(40)
  • Spring | 基于SpringBoot的多数据源实战 - 使用seata实现多数据源的全局事务管理

    在软件开发中, 多数据源 的应用越来越普遍,特别是在 微服务架构 和 业务模块化 的场景下。多数据源能够让不同的业务模块和微服务拥有各自独立的数据存储,大大提高了系统的灵活性和可维护性。本文将深入探讨多数据源的配置和实施,以及在 Spring Boot 环境下,如何通

    2024年02月07日
    浏览(60)
  • 【Java核心知识】spring boot整合Mybatis plus + Phoenix 访问Hbase与使用注意

    为什么Phoenix能让开发者通过SQL访问Hbase而不必使用原生的方式?引用Phoenix官网上的一句话:SQL is just a way of expressing what you want to get not how you want to get it . 即SQL不是一种数据操作技术,而是一种特殊的表达方式。只是表示你需要什么而不是你如何获得。 一个集成了Phoenix的Hb

    2024年02月15日
    浏览(66)
  • 使用IDEA连接hbase数据库

     Hbase是安装在另一台LINUX服务器上的,需要本地通过JAVA连接HBase数据库进行操作。由于是第一次接触HBase,过程当中百度了很多资料,也遇到了很多的问题。耗费了不少时间才成功连接上。特记录下过程当中遇到的问题。 JAVA连接HBase代码如下: 首先通过POM将需要的JAR包导入。

    2024年02月03日
    浏览(87)
  • Springboot +spring security,实现前后端分离,使用JSON数据格式登录(将表单提交方式改成json格式登录)

    在前面的文章中,我们使用表单方式完成登录提交,但是目前基本都是前后端分离项目,很少使用表单提交的方式,基本都是json方式,使用ajax提交,那么我们怎么将表单提交方式改成json格式登录呢? 通过前面源码部分学习中,已经知道在HttpSecurity配置中,每新增一种配置,

    2024年02月06日
    浏览(46)
  • SpringBoot + Vue前后端分离项目实战 || 三:Spring Boot后端与Vue前端连接

    系列文章: SpringBoot + Vue前后端分离项目实战 || 一:Vue前端设计 SpringBoot + Vue前后端分离项目实战 || 二:Spring Boot后端与数据库连接 SpringBoot + Vue前后端分离项目实战 || 三:Spring Boot后端与Vue前端连接 SpringBoot + Vue前后端分离项目实战 || 四:用户管理功能实现 SpringBoot + Vue前后

    2024年02月12日
    浏览(66)
  • 【Spring Boot】Spring Boot 配置 Hikari 数据库连接池

    数据库连接池是一个提高程序与数据库的连接的优化,连接池它主要作用是提高性能、节省资源、控制连接数、连接管理等操作; 程序中的线程池与之同理,都是为了优化、提高性能。

    2024年02月11日
    浏览(51)
  • Spring Boot 使用 Druid 连接池详解

    Alibaba Druid 是一个 JDBC 组件库,包含数据库连接池、SQL Parser 等组件,被大量业务和技术产品使用或集成,经历过严苛的线上业务场景考验,是值得信赖的技术产品。Druid Spring Boot Starter 用于帮助你在 Spring Boot 项目中轻松集成 Druid 数据库连接池和监控。 https://github.com/alibaba/

    2023年04月08日
    浏览(47)
  • 【Spring Boot】JdbcTemplate数据连接模板 — JdbcTemplate入门

    本节从基础的部分开始介绍什么是JDBC、什么是JdbcTemplate,然后介绍Spring Boot项目如何使用JdbcTemplate操作数据库。 1.1 什么是JDBC JDBC(Java Data Base Connectivity,Java数据库连接)是Java语言中用来规范应用程序如何访问数据库的API,为多种关系数据库提供统一访问方式,诸如查询和更

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包