SpringBoot使用Hbase
一,引入依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.3.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
二,配置文件添加自己的属性
hbase:
zookeeper:
quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155
property:
clientPort: 2181
master:
port: 9001
三,配置类注入HBASE配置
package com.hbase.config;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName: HBaseConfig
* @author: Leemon
* @Description: TODO
* @date: 2023/4/12 18:06
* @version: 1.0
*/
@Configuration
@RefreshScope
public class HBaseConfig {
@Value("${hbase.zookeeper.quorum}")
private String zookeeperQuorum;
@Value("${hbase.zookeeper.property.clientPort}")
private String clientPort;
@Value("${hbase.master.port}")
private String masterPort;
@Bean
public org.apache.hadoop.conf.Configuration hbaseConfiguration() {
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", clientPort);
// 如果hbase是集群,这个必须加上
// 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的
conf.set("hbase.master", zookeeperQuorum + ":" + masterPort);
conf.set("hbase.client.keyvalue.maxsize", "20971520");
conf = HBaseConfiguration.create(conf);
return conf;
}
}
四,配置Hbase连接池
这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长
package com.hbase.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Vector;
/**
* @ClassName: HbaseConnectionPool
* @author: Leemon
* @Description: TODO
* @date: 2023/4/13 9:45
* @version: 1.0
*/
@Component
@Slf4j
public class HbaseConnectionPool {
/**
* 连接池最大的大小
*/
private int nMaxConnections = 20;
/**
* 连接池自动增加的大小
*/
private int nIncrConnectionAmount = 3;
/**
* 连接池的初始大小
*/
private int nInitConnectionAmount = 3;
/**
* 存放连接池中数据库连接的向量,初始时为null
*/
private Vector vcConnections = null;
@Resource
private Configuration hbaseConfiguration;
@PostConstruct
public void init() {
try {
vcConnections = new Vector();
createConnections(nInitConnectionAmount);
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized Connection getConnection() {
// 确保连接池己被创建
if (vcConnections == null) {
// 连接池还没创建,则返回null
return null;
}
// 获得一个可用的数据库连接
Connection conn = getFreeConnection();
// 如果目前没有可以使用的连接,即所有的连接都在使用中
while (conn == null) {
// 等一会再试
try {
wait(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接
conn = getFreeConnection();
}
// 返回获得的可用的连接
return conn;
}
/**
* 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果
* 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置
* 的值创建几个数据库连接,并放入连接池中。
* 如果创建后,所有的连接仍都在使用中,则返回 null
* @return
* 返回一个可用的数据库连接
*/
private Connection getFreeConnection() {
// 从连接池中获得一个可用的数据库连接
Connection conn = findFreeConnection();
if (conn == null) {
// 如果目前连接池中没有可用的连接
// 创建一些连接
try {
createConnections(nIncrConnectionAmount);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error("create new connection fail.", e);
}
// 重新从池中查找是否有可用连接
conn = findFreeConnection();
if (conn == null) {
// 如果创建连接后仍获得不到可用的连接,则返回 null
return null;
}
}
return conn;
}
/**
* 创建由 numConnections 指定数目的数据库连接 , 并把这些连接
* 放入 connections 向量中
* @param _nNumConnections 要创建的数据库连接的数目
* @throws Exception
*/
private void createConnections(int _nNumConnections) throws Exception {
// 循环创建指定数目的数据库连接
for (int x = 0; x < _nNumConnections; x++) {
// 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections
// 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。
// 如果连接数己经达到最大,即退出。
if (this.nMaxConnections > 0 && this.vcConnections.size() >= this.nMaxConnections) {
log.warn("已达到最大连接数,不能再增加连接");
throw new Exception("已达到最大连接数"+ nMaxConnections+",不能再增加连接");
}
// 增加一个连接到连接池中(向量 connections 中)
vcConnections.addElement(new ConnectionWrapper(newConnection()));
log.info("HBase数据库连接己创建 ...... " + x);
}
}
/**
* 查找池中所有的連接,查找一个可用的數據庫連接,
* 如果没有可用的連結,返回null
* @return
* 返回一個可用的數據庫連接
*/
private Connection findFreeConnection() {
Connection conn = null;
ConnectionWrapper connWrapper = null;
//獲得連接池向量中所有的對象
Enumeration enumerate = vcConnections.elements();
//遍歷所有的对象,看是否有可用的連接
while (enumerate.hasMoreElements()) {
connWrapper = (ConnectionWrapper) enumerate.nextElement();
if (!connWrapper.isBusy()) {
//如果此對象不忙,則獲得它的數據庫連接并把它設為忙
conn = connWrapper.getConnection();
connWrapper.setBusy(true);
// 己经找到一个可用的連接,退出
break;
}
}
// 返回找到的可用連接
return conn;
}
/**
*创建一个新的数据库连接并返回它
* @return
* 返回一个新创建的数据库连接
*/
private Connection newConnection() {
/** hbase 连接 */
Connection conn = null;
// 创建一个数据库连接
try {
conn = ConnectionFactory.createConnection(hbaseConfiguration);
} catch (IOException e) {
log.error("创建HBase数据库连接失败!");
e.printStackTrace();
}
// 返回创建的新的数据库连接
return conn;
}
public synchronized void releaseConnection(Connection conn) {
if (this.vcConnections == null) {
log.info("连接池不存在,无法返回此连接到连接池中!!");
} else {
ConnectionWrapper connWrapper = null;
Enumeration enumerate = this.vcConnections.elements();
while(enumerate.hasMoreElements()) {
connWrapper = (ConnectionWrapper) enumerate.nextElement();
if (conn == connWrapper.getConnection()) {
connWrapper.setBusy(false);
break;
}
}
}
}
class ConnectionWrapper {
/**
* 数据库连接
*/
private Connection connection = null;
/**
* 此连接是否正在使用的标志,默认没有正在使用
*/
private boolean busy = false;
/**
* 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
*/
public ConnectionWrapper(Connection connection) {
this.connection = connection;
}
/**
* 返回此对象中的连接
*/
public Connection getConnection() {
return connection;
}
/**
* 设置此对象的连接
*/
public void setConnection(Connection connection) {
this.connection = connection;
}
/**
* 获得对象连接是否忙
*/
public boolean isBusy() {
return busy;
}
/**
* 设置对象的连接正在忙
*/
public void setBusy(boolean busy) {
this.busy = busy;
}
}
}
init()方法实现在初始化连接池的时候创建默认数值的连接。
五,配置操作服务类
操作类接口 HbaseService.java
package com.hbase.service;
import org.apache.hadoop.hbase.client.Scan;
import java.util.Map;
/**
* @InterfaceName: HbaseService
* @author: Leemon
* @Description: TODO
* @date: 2023/4/12 18:11
* @version: 1.0
*/
public interface HbaseService {
Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey);
Map<String,String> getRowData(String tableName, String rowKey);
Map<String,String> getFamilyValue(String tableName, String rowKey, String familyName);
String getColumnValue(String tableName, String rowKey, String familyName, String columnName);
Map<String,Map<String,String>> queryData(String tableName, Scan scan);
}
接口实现类 HbaseServiceImpl.java
package com.hbase.service.impl;
import com.hbase.config.HbaseConnectionPool;
import com.hbase.service.HbaseService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.*;
/**
* @ClassName: HbaseServiceImpl
* @author: Leemon
* @Description: TODO
* @date: 2023/4/12 18:13
* @version: 1.0
*/
@Slf4j
@Service
public class HbaseServiceImpl implements HbaseService {
@Resource
private HbaseConnectionPool pool;
@Override
public Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){
Scan scan = new Scan();
if(StringUtils.isNotBlank(startRowKey) && StringUtils.isNotBlank(stopRowKey)){
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(stopRowKey));
}
return this.queryData(tableName,scan);
}
public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){
Scan scan = new Scan();
if(StringUtils.isNotBlank(prefix)){
Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
scan.setFilter(filter);
}
return this.queryData(tableName,scan);
}
@Override
public Map<String,Map<String,String>> queryData(String tableName, Scan scan){
Map<String,Map<String,String>> result = new HashMap<>();
ResultScanner rs = null;
// 获取表
Table table= null;
Connection connection = null;
try {
connection = pool.getConnection();
table = getTable(connection, tableName);
rs = table.getScanner(scan);
for (Result r : rs) {
//每一行数据
Map<String,String> columnMap = new HashMap<>();
String rowKey = null;
for (Cell cell : r.listCells()) {
if(rowKey == null){
rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
}
columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
if(rowKey != null){
result.put(rowKey,columnMap);
}
}
}catch (IOException e) {
log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}"
,tableName),e);
}finally {
close(null, rs, table, connection);
}
return result;
}
@Override
public Map<String,String> getRowData(String tableName, String rowKey){
//返回的键值对
Map<String,String> result = new HashMap<>();
Get get = new Get(Bytes.toBytes(rowKey));
// 获取表
Table table= null;
Connection connection = null;
try {
connection = pool.getConnection();
table = getTable(connection, tableName);
Result hTableResult = table.get(get);
if (hTableResult != null && !hTableResult.isEmpty()) {
for (Cell cell : hTableResult.listCells()) {
result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
// 某些应用场景需要插入到数据库的时间
if (hTableResult.listCells().size() > 0) {
result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp() + "");
}
}
}catch (IOException e) {
log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}"
,tableName,rowKey),e);
}finally {
close(null,null, table, connection);
}
return result;
}
@Override
public Map<String,String> getFamilyValue(String tableName, String rowKey, String familyName){
//返回的键值对
Map<String,String> result = new HashMap<>(2);
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(Bytes.toBytes(familyName));
// 获取表
Table table= null;
Connection connection = null;
try {
connection = pool.getConnection();
table = getTable(connection, tableName);
Result getResult = table.get(get);
if (getResult != null && !getResult.isEmpty()) {
for (Cell cell : getResult.listCells()) {
result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}
} catch (IOException e) {
log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}"
, tableName, rowKey, familyName), e);
}finally {
close(null,null, table, connection);
}
return result;
}
@Override
public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){
String str = null;
Get get = new Get(Bytes.toBytes(rowKey));
// 获取表
Table table= null;
Connection connection = null;
try {
connection = pool.getConnection();
table = getTable(connection, tableName);
Result result = table.get(get);
if (result != null && !result.isEmpty()) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
if(cell != null){
str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
} catch (IOException e) {
log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
,tableName,rowKey,familyName,columnName),e);
}finally {
close(null,null, table, connection);
}
return str;
}
private Table getTable(Connection connection, String tableName) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
return table;
}
private void close(Admin admin, ResultScanner rs, Table table, Connection connection){
if(admin != null){
try {
admin.close();
} catch (IOException e) {
log.error("关闭Admin失败",e);
}
}
if(rs != null){
rs.close();
}
if(table != null){
try {
table.close();
} catch (IOException e) {
log.error("关闭Table失败",e);
}
}
// 释放连接
if (Objects.nonNull(connection)) {
pool.releaseConnection(connection);
}
}
}
ok,现在就可以操作使用了。
以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)文章来源:https://www.toymoban.com/news/detail-579676.html
这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。文章来源地址https://www.toymoban.com/news/detail-579676.html
到了这里,关于SpringBoot使用Hbase的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!