目录
1. Zookeeper Kerberos配置
2. HBase配置Kerberos
3. HBase启动及访问验证
4. HBase Shell操作HBase
5. Java API操作HBase
技术连载系列,前面内容请参考前面连载10内容:Kerberos安全认证-连载10-Hive Kerberos 安全配置及访问_IT贫道的博客-CSDN博客
大数据组件HBase也可以通过Kerberos进行安全认证,由于HBase中需要zookeeper进行元数据管理、主节点选举、故障恢复,所以这里对HBase进行Kerberos安全认证时,建议也对Zookeeper进行安全认证。
1. Zookeeper Kerberos配置
zookeeper集群节点分布如下:
节点IP |
节点名称 |
Zookeeper |
192.168.179.4 |
node1 |
|
192.168.179.5 |
node2 |
|
192.168.179.6 |
node3 |
★ |
192.168.179.7 |
node4 |
★ |
192.168.179.8 |
node5 |
★ |
目前启动zookeeper集群后,可以通过任意Zookeeper客户端操作Zookeeper,可以通过Kerberos对Zookeeper进行认证避免非认证用户操作Zookeeper。按照如下步骤进行zookeeper kerberos认证。
1) 创建zookeeper Princial服务主体
在node1节点执行如下命令创建zookeeper Princial服务主体。
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node3"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node4"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node5"
2) 将zookeeper服务主体写入到keytab文件
在node1节点执行如下命令,将zookeeper服务主体写入到keytab文件:
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node3@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node4@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node5@EXAMPLE.COM"
以上命令执行完成后,可以在node1节点/home/keytabs目录下生成zookeeper.service.keytab文件。
3) 将keytab文件分发到其他节点并修改权限
这里将node1节点生成的zookeeper.service.keytab文件分发到node2~node5节点,这里也可以直接分发到zookeeper服务所在节点,为了保持集群各个节点keytab文件一致,这里将该keytab文件分发到集群各个节点中。
#在node1节点中执行如下命令进行分发
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node5:/home/keytabs/
分发到node2~node5各个节点后,执行如下命令对zookeeper.service.keytab修改权限。
#node1~node5所有节点执行如下命令
chown root:hadoop /home/keytabs/zookeeper.service.keytab
chmod 770 /home/keytabs/zookeeper.service.keytab
4) 配置zoo.cfg文件
在node3~node5 zookeeper节点上配置zoo.cfg配置文件,每个节点在该文件最后追加如下配置:
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
sessionRequireClientSASLAuth=false
skipACL=yes
以上配置项的解释如下:
- kerberos.removeHostFromPrincipal
kerberos认证时是否从Principal中移除主机名部分。
- kerberos.removeRealmFromPrincipal
kerberos认证时是否从Principal中移除领域(Realm)部分。
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
指定ZooKeeper对客户端进行身份验证时使用的身份验证类。
- jaasLoginRenew
Jaas登录(Java Authentication and Authorization Service)的续约时间,这里是每隔1小时需要续约Jaas登录。
- sessionRequireClientSASLAuth
是否要求客户端进行SASL认证,SASL(Simple Authentication and Security Layer)认证是一种安全机制,用于在客户端和服务器之间进行身份验证和加密通信。当设置为要求客户端进行SASL认证时,客户端必须提供有效的凭证(如用户名和密码)来证明其身份,并与ZooKeeper服务器进行安全的通信。
这里需要将该值设置为false,否则Hadoop HA 不能正常启动。
- skipACL=yes
跳过Zookeeper 访问控制列表(ACL)验证,允许连接zookeper后进行读取和写入。这里建议跳过,否则配置HBase 启动后不能向Zookeeper中写入数据。
这里在node3节点进行zoo.cfg文件的配置,配置完成后,将zoo.cfg文件分发到node4、node5 zookeeper节点上。
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node4:/software/apache-zookeeper-3.6.3-bin/conf/
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node5:/software/apache-zookeeper-3.6.3-bin/conf/
5) 配置jaas.conf文件
在node3~node5各个zookeeper节点中ZOOKEEPER_HOME/conf/目录下创建jaas.conf文件,该文件用于zookeeper服务端与客户端进行认证。配置内容如下:
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/zookeeper.service.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/node3@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/zookeeper.service.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/node3@EXAMPLE.COM";
};
以上配置文件是在node3 zookeeper节点配置,将以上文件分发到node4、node5节点后,需要将principal修改成对应节点的信息。
node4 zookeeper节点jaas.conf配置如下:
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/zookeeper.service.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/node4@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/zookeeper.service.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/node4@EXAMPLE.COM";
};
node5 zookeeper节点jaas.conf配置如下:
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/zookeeper.service.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/node5@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/zookeeper.service.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/node5@EXAMPLE.COM";
};
6) 配置java.env文件
在zookeeper各个节点ZOOKEEPER_HOME/conf目录下创建java.env文件,写入如下内容,这里在node3~node5各个节点都需配置。
#指定jaas.conf文件路径
export JVMFLAGS="-Djava.security.auth.login.config=/software/apache-zookeeper-3.6.3-bin/conf/jaas.conf"
以上可以先在node3节点进行配置,然后分发到node4~node5节点上。如下:
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node4:/software/apache-zookeeper-3.6.3-bin/conf/
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node5:/software/apache-zookeeper-3.6.3-bin/conf/
7) 启动zookeeper
在node3~node5各个节点执行如下命令,启动zookeeper。
#启动zookeeper
zkServer.sh start
#检查zookeeper状态
zkServer.sh status
2. HBase配置Kerberos
HBase也支持Kerberos安全认证,经过测试,HBase2.5.x版本在经过Kerberos认证后与Hadoop通信认证有异常,具体报错如下:
hdfs.DataStreamer: Exception in createBlockOutputStream java.io.IOException: Invalid token in javax.security.sasl.qop: DI
这里建议使用HBase2.5.x以下版本与kerberos进行整合,但HBase版本的选择也需要参考Hadoop的版本,可以在Hbase官网找到Hadoop版本匹配的HBase版本,地址为:https://hbase.apache.org/book.html#hadoop
我们这里使用的Hadoop版本为3.3.4,HBase版本选择2.2.6版本。HBase Kerberos安全认证配置步骤如下:
1) 创建HBase服务用户并两两节点进行免密
在Hadoop体系中操作HBase时通常使用hbase用户,这里在集群各个节点创建hbase用户并设置在HBase用户下,节点之间的免密。
在node1~node5节点创建hbase用户,所属hadoop组。
#这里设置用户密码为123456
useradd hbase -g hadoop
passwd hbase
各个节点切换到hbase用户,并设置各个节点两两免密。
#所有节点切换成hbase用户
su hbase
cd ~
#node1~node5所有节点生成SSH密钥
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
#node1~node5所有节点公钥复制到node1节点上,这个过程需要输入yes和密码
ssh-copy-id node1
#将node1 authorized_keys文件分发到node1~node5各节点,这个过程需要输入密码
[hbase@node1 ~]$ cd ~/.ssh/
[hbase@node1 .ssh]$ scp authorized_keys node2:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node3:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node4:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node5:`pwd`
#两两节点进行ssh测试,这一步骤必须做,然后node1~node5节点退出当前hbase用户
exit
2) 创建HBase服务Princial主体并写入到keytab文件
在kerberos服务端node1节点执行如下命令创建HBase服务主体:
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node3"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node4"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node5"
在kerberos服务端node1节点执行如下命令将hbase主体写入到keytab文件:
#node1节点执行命令,将主体写入到keytab
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node3@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node4@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node5@EXAMPLE.COM"
以上命令执行完成后,在node1节点/home/keytabs目录下生成hbase.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node3~node5 Hbase所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node5:/home/keytabs/
分发完成后,在集群各个节点上执行如下命令,修改hbase.service.keytab密钥文件所属用户和组信息:
chown root:hadoop /home/keytabs/hbase.service.keytab
chmod 770 /home/keytabs/hbase.service.keytab
3) 修改hbase-site.xml配置文件
在所有hbase节点中修改HBASE_HOME/conf/hbase-site.xml文件内容,配置支持kerberos,向hbase-site.xml文件中追加如下内容:
<!-- hbase开启安全认证 -->
<property>
<name>hbase.security.authorization</name>
<value>true</value>
</property>
<!-- hbase配置kerberos安全认证 -->
<property>
<name>hbase.security.authentication</name>
<value>kerberos</value>
</property>
<!-- HMaster配置kerberos安全凭据认证 -->
<property>
<name>hbase.master.kerberos.principal</name>
<value>hbase/_HOST@EXAMPLE.COM</value>
</property>
<!-- HMaster配置kerberos安全证书keytab文件位置 -->
<property>
<name>hbase.master.keytab.file</name>
<value>/home/keytabs/hbase.service.keytab</value>
</property>
<!-- Regionserver配置kerberos安全凭据认证 -->
<property>
<name>hbase.regionserver.kerberos.principal</name>
<value>hbase/_HOST@EXAMPLE.COM</value>
</property>
<!-- Regionserver配置kerberos安全证书keytab文件位置 -->
<property>
<name>hbase.regionserver.keytab.file</name>
<value>/home/keytabs/hbase.service.keytab</value>
</property>
这里可以先在node3节点进行配置,配置完成后分发到node4、node5节点:
[root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node4:/software/hbase-2.2.6/conf/
[root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node5:/software/hbase-2.2.6/conf/
4) 准备hdfs-site.xml和core-site.xml文件
HBase底层存储使用HDFS,这里需要将Hadoop中hdfs-site.xml和core-site.xml文件发送到hbase各个节点HBASE_HOME/conf/目录中。在node3~node5各个HBase节点中执行如下命令:
#node3~node5各个节点执行
cp /software/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /software/hbase-2.2.6/conf/
cp /software/hadoop-3.3.4/etc/hadoop/core-site.xml /software/hbase-2.2.6/conf/
5) 准备zk-jaas.conf文件
HBase中会使用zookeeper管理元数据和选主,这里由于Zookeeper中已经开启了Kerberos认证,所以需要准备zk-jaas.conf文件连接zookeeper时进行认证。在HBase各个节点HBASE_HOME/conf目录下创建zk-jaas.conf文件,写入如下内容,不同的节点设置的principal是对应HBase主机节点。
node3节点HBASE_HOME/conf/zk-jaas.conf文件内容:
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/hbase.service.keytab"
useTicketCache=false
principal="hbase/node3@EXAMPLE.COM";
};
node4节点HBASE_HOME/conf/zk-jaas.conf文件内容:
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/hbase.service.keytab"
useTicketCache=false
principal="hbase/node4@EXAMPLE.COM";
};
node5节点HBASE_HOME/conf/zk-jaas.conf文件内容:
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/keytabs/hbase.service.keytab"
useTicketCache=false
principal="hbase/node5@EXAMPLE.COM";
};
6) 修改hbase-env.sh配置
在各个hbase节点上配置HBASE_HOME/conf/hbase-env.sh,将HBASE_OPTS选项修改为如下:
#node3~node5各个节点修改hbase-env.sh中的HBASE_OPTS配置
export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -Djava.security.auth.login.config=/software/hbase-2.2.6/conf/zk-jaas.conf"
7) 修改HBase安装目录权限
在HBase各个节点创建HBASE_HOME/logs目录,如果该目录不存在需要提前创建,存在即可忽略,该目录为HBase日志目录,需要将该目录权限修改为root:hadoop、访问权限为770,否则hbase用户在启动Hbase集群时没有向该目录写日志权限。
#在node3~node5 HBase节点执行命令创建HBASE_HOME/logs
mkdir -p /software/hbase-2.2.6/logs
将各个节点的hbase安装目录权限修改为root:hadoop,logs目录访问权限为770:
#在node3~node5 HBase节点执行命令
chown -R root:hadoop /software/hbase-2.2.6
chmod -R 770 /software/hbase-2.2.6/logs
3. HBase启动及访问验证
在启动HBase集群之前,需要将Zookeeper和HDFS集群重启,Zookeeper和HDFS启动命令如下:
#停止hadoop集群
[root@node1 ~]# stop-all.sh
#node3~node5节点,重启zookeeper
[root@node3 ~]# zkServer.sh restart
[root@node3 ~]# zkServer.sh restart
[root@node3 ~]# zkServer.sh restart
#启动HDFS集群
[root@node1 ~]# start-all.sh
如果没有配置Kerberos认证时HBase之前启动过,在HDFS中将会有/hbase 目录,需要使用hdfs超级管理员用户将该目录的用户和所属组修改成hbase:hadoop。
#在node5节点执行
[root@node5 ~]# su hdfs
[hdfs@node5 ~]$ kinit hdfs
Password for hdfs@EXAMPLE.COM: 123456
[hdfs@node5 logs]$ hdfs dfs -ls /
Found 6 items
drwxr-xr-x - root supergroup 0 2023-05-23 10:50 /hbase
#修改用户和所属组
[hdfs@node5 logs]$ hadoop dfs -chown -R hbase:hadoop /hbase
在HBase主节点node4上执行如下命令启动HBase集群:
#无需切换hbase用户,可以直接在node4节点执行如下命令启动hbase集群
[root@node4 ~]# sudo -i -u hbase start-hbase.sh
注意:以上启动Hbase集群命令中没有切换到hbase用户,sudo为使用超级用户权限执行命令,-i表示交互模式执行命令,-u指定切换的用户。
HBase启动后,可以通过http://node4:16010进行访问:
4. HBase Shell操作HBase
在node3~node5任意节点登录hbase shell,可以看到如果节点没有进行用户认证,读取HBase数据时没有权限:
#假设在node4登录hbase客户端
[root@node4 ~]# klist
klist: No credentials cache found (filename: /tmp/krb5cc_0)
[root@node4 ~]# hbase shell
hbase(main):001:0> list
TABLE
ERROR: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
如果想要正常读取HBase中的数据需要进行客户端认证,这里以zhangsan用户为例,进行Kerberos主体认证,认证后操作HBase,操作如下:
#进行用户认证
[root@node4 ~]# kinit zhangsan
Password for zhangsan@EXAMPLE.COM: 123456
[root@node4 ~]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: zhangsan@EXAMPLE.COM
#登录hbase shell
[root@node4 ~]# hbase shell
hbase(main):001:0> create 'person','cf1','cf2'
Created table person
hbase(main):002:0> list
person
hbase(main):003:0> put 'person','row1','cf1:id','1'
hbase(main):004:0> put 'person','row1','cf1:name','zs'
hbase(main):005:0> put 'person','row1','cf1:age',18
hbase(main):006:0> scan 'person'
ROW COLUMN+CELL
row1 column=cf1:age, timestamp=1684317908851, value=18
row1 column=cf1:id, timestamp=1684317879179, value=1
row1 column=cf1:name, timestamp=1684317894358, value=zs
5. Java API操作HBase
java API操作经过Kerberos安全认证的HBase按照如下步骤实现即可。
1) 准备krb5.conf文件
将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。
2) 生成用户keytab文件
在kerberos服务端node1节点上,执行如下命令,对zhangsan用户主体生成keytab密钥文件。
#在node1 kerberos服务端执行
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /root/zhangsan.keytab zhangsan@EXAMPLE.COM"
以上命令执行之后,在/root目录下会生成zhangsan.keytab文件,将该文件复制到IDEA项目中的resources资源目录中或者本地window固定的某个目录中,该文件用于编写代码时认证kerberos。
3) 准备访问HDFS需要的资源文件
将HDFS中的core-site.xml 、hdfs-site.xml 、yarn-site.xml、hbase-site.xml文件上传到项目resources资源目录中。
4) 编写代码操作HBase
项目代码中引入如下maven依赖:
<!-- hbase依赖包 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.6</version>
</dependency>
编写Java代码操作Kerberos认证的HBase代码如下:文章来源:https://www.toymoban.com/news/detail-725174.html
/**
* Java API 操作Kerberos 认证的HBase
*/
public class OperateAuthHBase {
private static Configuration conf;
private static Connection connection;
private static Admin admin;
/**
* 初始化:进行kerberos认证并设置hbase configuration
*/
static {
try {
String principal = "zhangsan@EXAMPLE.COM";
String keytabPath = "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\zhangsan.keytab";
conf = HBaseConfiguration.create();
// 设置Kerberos 认证
System.setProperty("java.security.krb5.conf", "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\krb5.conf");
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
//设置zookeeper集群地址及端口号
conf.set("hbase.zookeeper.quorum","node3,node4,node5");
conf.set("hbase.zookeeper.property.clientPort","2181");
//创建连接
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException {
try{
//获取Admin对象
admin = connection.getAdmin();
//创建表
createHBaseTable("tbl1", "cf1");
System.out.println("===========================");
//查看Hbase中所有表
listHBaseTables();
System.out.println("===========================");
//向表中插入数据
putTableData("tbl1","row1", "cf1", "id", "100");
putTableData("tbl1","row1", "cf1", "name", "zs");
putTableData("tbl1","row1", "cf1", "age", "18");
putTableData("tbl1","row2", "cf1", "id", "200");
putTableData("tbl1","row2", "cf1", "name", "ls");
putTableData("tbl1","row2", "cf1", "age", "19");
System.out.println("===========================");
//查询表中的数据
scanTableData("tbl1");
System.out.println("===========================");
//查询指定row数据
getRowData("tbl1", "row1","cf1","name");
System.out.println("===========================");
//删除指定row数据
deleteRowData("tbl1", "row1","cf1","name");
System.out.println("===========================");
//查询表中的数据
scanTableData("tbl1");
System.out.println("===========================");
//删除表
deleteHBaseTable("tbl1");
//查看Hbase中所有表
listHBaseTables();
System.out.println("===========================");
}catch (Exception e) {
e.printStackTrace();
}finally {
if(admin != null) {
admin.close();
}
if(connection != null) {
connection.close();
}
}
}
//判断表是否存在
private static boolean isTableExist(String tableName) {
try {
return admin.tableExists(TableName.valueOf(tableName));
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
private static void createHBaseTable(String tableName, String... cfs) {
if(isTableExist(tableName)) {
System.out.println("表已经存在!");
return;
}
if(cfs == null || cfs.length == 0){
System.out.println("列族不能为空!");
return;
}
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
List<ColumnFamilyDescriptor> cFDBList = new ArrayList<>();
for (String cf : cfs) {
cFDBList.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
}
tableDescriptorBuilder.setColumnFamilies(cFDBList);
try {
admin.createTable(tableDescriptorBuilder.build());
System.out.println("创建表" + tableName + "成功!");
} catch (IOException e) {
System.out.println("创建失败!");
e.printStackTrace();
}
}
//查看Hbase中所有表
private static void listHBaseTables() {
try {
TableName[] tableNames = admin.listTableNames();
System.out.println("打印所有命名空间表名:");
for (TableName tableName : tableNames) {
System.out.println(tableName);
}
} catch (IOException e) {
e.printStackTrace();
}
}
//向表中插入数据
private static void putTableData(String tableName, String rowKey, String cf, String column, String value) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Put p = new Put(Bytes.toBytes(rowKey));
p.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(p);
System.out.println("插入数据成功!");
} catch (IOException e) {
e.printStackTrace();
} finally {
closeTable(table);
}
}
//查询表中的数据
private static void scanTableData(String tableName) {
if(tableName == null || tableName.length() == 0) {
System.out.println("请正确输入表名!");
return;
}
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner resultScanner = table.getScanner(scan);
Iterator<Result> iterator = resultScanner.iterator();
while(iterator.hasNext()) {
Result result = iterator.next();
Cell[] cells = result.rawCells();
for(Cell cell : cells) {
//得到rowkey
System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
closeTable(table);
}
}
//查询指定row数据
private static void getRowData(String tableName, String rowkey,String colFamily, String col) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Get g = new Get(Bytes.toBytes(rowkey));
// 获取指定列族数据
if(col == null && colFamily != null) {
g.addFamily(Bytes.toBytes(colFamily));
} else if(col != null && colFamily != null) {
// 获取指定列数据
g.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
}
Result result = table.get(g);
System.out.println("查询指定rowkey 数据如下");
for(Cell cell : result.rawCells()) {
//得到rowkey
System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
closeTable(table);
}
}
//删除指定row数据
private static void deleteRowData(String tableName , String rowKey, String colFamily, String ... cols) {
if(tableName == null || tableName.length() == 0) {
System.out.println("请正确输入表名!");
return;
}
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Delete del = new Delete(Bytes.toBytes(rowKey));
// 删除指定列族
del.addFamily(Bytes.toBytes(colFamily));
// 删除指定列
for (String col : cols) {
del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
}
table.delete(del);
System.out.println("删除数据成功!");
} catch (IOException e) {
e.printStackTrace();
} finally {
closeTable(table);
}
}
//删除表
private static void deleteHBaseTable(String tableName) {
try {
if(!isTableExist(tableName)) {
System.out.println("要删除的表不存在!");
return;
}
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (IOException e) {
e.printStackTrace();
}
}
//关闭表
private static void closeTable(Table table) {
if(table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!文章来源地址https://www.toymoban.com/news/detail-725174.html
到了这里,关于Kerberos安全认证-连载11-HBase Kerberos安全配置及访问的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!