Kerberos安全认证-连载11-HBase Kerberos安全配置及访问

这篇具有很好参考价值的文章主要介绍了Kerberos安全认证-连载11-HBase Kerberos安全配置及访问。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1. Zookeeper Kerberos配置

2. HBase配置Kerberos

3. HBase启动及访问验证

4. HBase Shell操作HBase

​​​​​​​5. Java API操作HBase


技术连载系列,前面内容请参考前面连载10内容:​​​​​​​​​​​​​​Kerberos安全认证-连载10-Hive Kerberos 安全配置及访问_IT贫道的博客-CSDN博客

大数据组件HBase也可以通过Kerberos进行安全认证,由于HBase中需要zookeeper进行元数据管理、主节点选举、故障恢复,所以这里对HBase进行Kerberos安全认证时,建议也对Zookeeper进行安全认证。

1. Zookeeper Kerberos配置

zookeeper集群节点分布如下:

节点IP

节点名称

Zookeeper

192.168.179.4

node1

192.168.179.5

node2

192.168.179.6

node3

192.168.179.7

node4

192.168.179.8

node5

目前启动zookeeper集群后,可以通过任意Zookeeper客户端操作Zookeeper,可以通过Kerberos对Zookeeper进行认证避免非认证用户操作Zookeeper。按照如下步骤进行zookeeper kerberos认证。

1) 创建zookeeper Princial服务主体

在node1节点执行如下命令创建zookeeper Princial服务主体。

[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node3"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node4"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node5"

​​​​​​​2) 将zookeeper服务主体写入到keytab文件

在node1节点执行如下命令,将zookeeper服务主体写入到keytab文件:

[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node3@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node4@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node5@EXAMPLE.COM"

以上命令执行完成后,可以在node1节点/home/keytabs目录下生成zookeeper.service.keytab文件。

3) 将keytab文件分发到其他节点并修改权限

这里将node1节点生成的zookeeper.service.keytab文件分发到node2~node5节点,这里也可以直接分发到zookeeper服务所在节点,为了保持集群各个节点keytab文件一致,这里将该keytab文件分发到集群各个节点中。

#在node1节点中执行如下命令进行分发
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node5:/home/keytabs/

分发到node2~node5各个节点后,执行如下命令对zookeeper.service.keytab修改权限。

#node1~node5所有节点执行如下命令
chown root:hadoop /home/keytabs/zookeeper.service.keytab
chmod 770 /home/keytabs/zookeeper.service.keytab

​​​​​​​4) 配置zoo.cfg文件

在node3~node5 zookeeper节点上配置zoo.cfg配置文件,每个节点在该文件最后追加如下配置:

kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
sessionRequireClientSASLAuth=false
skipACL=yes

以上配置项的解释如下:

  • kerberos.removeHostFromPrincipal

kerberos认证时是否从Principal中移除主机名部分。

  • kerberos.removeRealmFromPrincipal

kerberos认证时是否从Principal中移除领域(Realm)部分。

  • authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

指定ZooKeeper对客户端进行身份验证时使用的身份验证类。

  • jaasLoginRenew

Jaas登录(Java Authentication and Authorization Service)的续约时间,这里是每隔1小时需要续约Jaas登录。

  • sessionRequireClientSASLAuth

是否要求客户端进行SASL认证,SASL(Simple Authentication and Security Layer)认证是一种安全机制,用于在客户端和服务器之间进行身份验证和加密通信。当设置为要求客户端进行SASL认证时,客户端必须提供有效的凭证(如用户名和密码)来证明其身份,并与ZooKeeper服务器进行安全的通信。

这里需要将该值设置为false,否则Hadoop HA 不能正常启动。

  • skipACL=yes

跳过Zookeeper 访问控制列表(ACL)验证,允许连接zookeper后进行读取和写入。这里建议跳过,否则配置HBase 启动后不能向Zookeeper中写入数据。

这里在node3节点进行zoo.cfg文件的配置,配置完成后,将zoo.cfg文件分发到node4、node5 zookeeper节点上。

[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node4:/software/apache-zookeeper-3.6.3-bin/conf/
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node5:/software/apache-zookeeper-3.6.3-bin/conf/

​​​​​​​​​​​​​​5) 配置jaas.conf文件

在node3~node5各个zookeeper节点中ZOOKEEPER_HOME/conf/目录下创建jaas.conf文件,该文件用于zookeeper服务端与客户端进行认证。配置内容如下:

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/node3@EXAMPLE.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/node3@EXAMPLE.COM";
};

以上配置文件是在node3 zookeeper节点配置,将以上文件分发到node4、node5节点后,需要将principal修改成对应节点的信息。

node4 zookeeper节点jaas.conf配置如下:

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/node4@EXAMPLE.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/node4@EXAMPLE.COM";
};

node5 zookeeper节点jaas.conf配置如下:

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/node5@EXAMPLE.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/node5@EXAMPLE.COM";
};

​​​​​​​6) 配置java.env文件

在zookeeper各个节点ZOOKEEPER_HOME/conf目录下创建java.env文件,写入如下内容,这里在node3~node5各个节点都需配置。

#指定jaas.conf文件路径
export JVMFLAGS="-Djava.security.auth.login.config=/software/apache-zookeeper-3.6.3-bin/conf/jaas.conf"

以上可以先在node3节点进行配置,然后分发到node4~node5节点上。如下:

[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node4:/software/apache-zookeeper-3.6.3-bin/conf/
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node5:/software/apache-zookeeper-3.6.3-bin/conf/

​​​​​​​​​​​​​​7) 启动zookeeper

在node3~node5各个节点执行如下命令,启动zookeeper。

#启动zookeeper
zkServer.sh start

#检查zookeeper状态
zkServer.sh status

​​​​​​​2. HBase配置Kerberos

HBase也支持Kerberos安全认证,经过测试,HBase2.5.x版本在经过Kerberos认证后与Hadoop通信认证有异常,具体报错如下:

hdfs.DataStreamer: Exception in createBlockOutputStream java.io.IOException: Invalid token in javax.security.sasl.qop: DI

这里建议使用HBase2.5.x以下版本与kerberos进行整合,但HBase版本的选择也需要参考Hadoop的版本,可以在Hbase官网找到Hadoop版本匹配的HBase版本,地址为:https://hbase.apache.org/book.html#hadoop

我们这里使用的Hadoop版本为3.3.4,HBase版本选择2.2.6版本。HBase Kerberos安全认证配置步骤如下:

1) 创建HBase服务用户并两两节点进行免密

在Hadoop体系中操作HBase时通常使用hbase用户,这里在集群各个节点创建hbase用户并设置在HBase用户下,节点之间的免密。

在node1~node5节点创建hbase用户,所属hadoop组。

#这里设置用户密码为123456
useradd hbase -g hadoop
passwd hbase

各个节点切换到hbase用户,并设置各个节点两两免密。

#所有节点切换成hbase用户
su hbase
cd ~

#node1~node5所有节点生成SSH密钥
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

#node1~node5所有节点公钥复制到node1节点上,这个过程需要输入yes和密码
ssh-copy-id node1

#将node1 authorized_keys文件分发到node1~node5各节点,这个过程需要输入密码
[hbase@node1 ~]$ cd ~/.ssh/
[hbase@node1 .ssh]$ scp authorized_keys node2:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node3:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node4:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node5:`pwd`

#两两节点进行ssh测试,这一步骤必须做,然后node1~node5节点退出当前hbase用户
exit

​​​​​​​2) 创建HBase服务Princial主体并写入到keytab文件

在kerberos服务端node1节点执行如下命令创建HBase服务主体:

[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node3"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node4"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node5"

在kerberos服务端node1节点执行如下命令将hbase主体写入到keytab文件:

#node1节点执行命令,将主体写入到keytab
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node3@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node4@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node5@EXAMPLE.COM"

以上命令执行完成后,在node1节点/home/keytabs目录下生成hbase.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node3~node5 Hbase所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。

[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node5:/home/keytabs/

分发完成后,在集群各个节点上执行如下命令,修改hbase.service.keytab密钥文件所属用户和组信息:

chown root:hadoop /home/keytabs/hbase.service.keytab
chmod 770 /home/keytabs/hbase.service.keytab

​​​​​​​3) 修改hbase-site.xml配置文件

在所有hbase节点中修改HBASE_HOME/conf/hbase-site.xml文件内容,配置支持kerberos,向hbase-site.xml文件中追加如下内容:

  <!-- hbase开启安全认证 -->
  <property>
    <name>hbase.security.authorization</name>
    <value>true</value>
  </property>
  <!-- hbase配置kerberos安全认证 -->
  <property>
  	<name>hbase.security.authentication</name>
  	<value>kerberos</value>
  </property>
  <!-- HMaster配置kerberos安全凭据认证 -->
  <property>
  	<name>hbase.master.kerberos.principal</name>
  	<value>hbase/_HOST@EXAMPLE.COM</value>
  </property>
  <!-- HMaster配置kerberos安全证书keytab文件位置 -->
  <property>
  	<name>hbase.master.keytab.file</name>
  	<value>/home/keytabs/hbase.service.keytab</value>
  </property>
  <!-- Regionserver配置kerberos安全凭据认证 -->
  <property>
  	<name>hbase.regionserver.kerberos.principal</name>
  	<value>hbase/_HOST@EXAMPLE.COM</value>
  </property>
  <!-- Regionserver配置kerberos安全证书keytab文件位置 -->
  <property>
  	<name>hbase.regionserver.keytab.file</name>
  	<value>/home/keytabs/hbase.service.keytab</value>
  </property>

这里可以先在node3节点进行配置,配置完成后分发到node4、node5节点:

[root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node4:/software/hbase-2.2.6/conf/
[root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node5:/software/hbase-2.2.6/conf/

​​​​​​​4) 准备hdfs-site.xml和core-site.xml文件

HBase底层存储使用HDFS,这里需要将Hadoop中hdfs-site.xml和core-site.xml文件发送到hbase各个节点HBASE_HOME/conf/目录中。在node3~node5各个HBase节点中执行如下命令:

#node3~node5各个节点执行
cp /software/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /software/hbase-2.2.6/conf/
cp /software/hadoop-3.3.4/etc/hadoop/core-site.xml /software/hbase-2.2.6/conf/

​​​​​​​​​​​​​​5) 准备zk-jaas.conf文件

HBase中会使用zookeeper管理元数据和选主,这里由于Zookeeper中已经开启了Kerberos认证,所以需要准备zk-jaas.conf文件连接zookeeper时进行认证。在HBase各个节点HBASE_HOME/conf目录下创建zk-jaas.conf文件,写入如下内容,不同的节点设置的principal是对应HBase主机节点。

node3节点HBASE_HOME/conf/zk-jaas.conf文件内容:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/hbase.service.keytab"
  useTicketCache=false
  principal="hbase/node3@EXAMPLE.COM";
};

node4节点HBASE_HOME/conf/zk-jaas.conf文件内容:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/hbase.service.keytab"
  useTicketCache=false
  principal="hbase/node4@EXAMPLE.COM";
};

node5节点HBASE_HOME/conf/zk-jaas.conf文件内容:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/hbase.service.keytab"
  useTicketCache=false
  principal="hbase/node5@EXAMPLE.COM";
};

​​​​​​​​​​​​​​6) 修改hbase-env.sh配置

在各个hbase节点上配置HBASE_HOME/conf/hbase-env.sh,将HBASE_OPTS选项修改为如下:

#node3~node5各个节点修改hbase-env.sh中的HBASE_OPTS配置
export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -Djava.security.auth.login.config=/software/hbase-2.2.6/conf/zk-jaas.conf"

​​​​​​​​​​​​​​7) 修改HBase安装目录权限

在HBase各个节点创建HBASE_HOME/logs目录,如果该目录不存在需要提前创建,存在即可忽略,该目录为HBase日志目录,需要将该目录权限修改为root:hadoop、访问权限为770,否则hbase用户在启动Hbase集群时没有向该目录写日志权限。

#在node3~node5 HBase节点执行命令创建HBASE_HOME/logs
mkdir -p /software/hbase-2.2.6/logs

将各个节点的hbase安装目录权限修改为root:hadoop,logs目录访问权限为770:

#在node3~node5 HBase节点执行命令
chown -R root:hadoop  /software/hbase-2.2.6
chmod -R 770 /software/hbase-2.2.6/logs

​​​​​​​3. HBase启动及访问验证

在启动HBase集群之前,需要将Zookeeper和HDFS集群重启,Zookeeper和HDFS启动命令如下:

#停止hadoop集群
[root@node1 ~]# stop-all.sh 

#node3~node5节点,重启zookeeper
[root@node3 ~]# zkServer.sh restart
[root@node3 ~]# zkServer.sh restart
[root@node3 ~]# zkServer.sh restart

#启动HDFS集群
[root@node1 ~]# start-all.sh 

如果没有配置Kerberos认证时HBase之前启动过,在HDFS中将会有/hbase 目录,需要使用hdfs超级管理员用户将该目录的用户和所属组修改成hbase:hadoop。

#在node5节点执行
[root@node5 ~]# su hdfs
[hdfs@node5 ~]$ kinit hdfs
Password for hdfs@EXAMPLE.COM: 123456

[hdfs@node5 logs]$ hdfs dfs -ls /
Found 6 items
drwxr-xr-x   - root     supergroup          0 2023-05-23 10:50 /hbase

#修改用户和所属组
[hdfs@node5 logs]$ hadoop dfs -chown -R hbase:hadoop /hbase

在HBase主节点node4上执行如下命令启动HBase集群:

#无需切换hbase用户,可以直接在node4节点执行如下命令启动hbase集群
[root@node4 ~]# sudo -i -u hbase start-hbase.sh

注意:以上启动Hbase集群命令中没有切换到hbase用户,sudo为使用超级用户权限执行命令,-i表示交互模式执行命令,-u指定切换的用户。

HBase启动后,可以通过http://node4:16010进行访问:

hbase kerberos认证,Kerberos安全认证,hbase,安全,大数据

 ​​​​​​​4. HBase Shell操作HBase

在node3~node5任意节点登录hbase shell,可以看到如果节点没有进行用户认证,读取HBase数据时没有权限:

#假设在node4登录hbase客户端
[root@node4 ~]# klist
klist: No credentials cache found (filename: /tmp/krb5cc_0)

[root@node4 ~]# hbase shell
hbase(main):001:0> list
TABLE                                                                                                                                            

ERROR: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

如果想要正常读取HBase中的数据需要进行客户端认证,这里以zhangsan用户为例,进行Kerberos主体认证,认证后操作HBase,操作如下:

#进行用户认证
[root@node4 ~]# kinit zhangsan
Password for zhangsan@EXAMPLE.COM: 123456
[root@node4 ~]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: zhangsan@EXAMPLE.COM

#登录hbase shell
[root@node4 ~]# hbase shell
                                                                                                                           
hbase(main):001:0> create 'person','cf1','cf2'
Created table person

hbase(main):002:0> list
person

hbase(main):003:0> put 'person','row1','cf1:id','1'
                                                                                                                        
hbase(main):004:0> put 'person','row1','cf1:name','zs'
                                                                                                                          
hbase(main):005:0> put 'person','row1','cf1:age',18
                                                                                                                       
hbase(main):006:0> scan 'person'
ROW                                   COLUMN+CELL                                                                                                
 row1                                 column=cf1:age, timestamp=1684317908851, value=18                                                          
 row1                                 column=cf1:id, timestamp=1684317879179, value=1                                                            
 row1                                 column=cf1:name, timestamp=1684317894358, value=zs 

​​​​​​​5. Java API操作HBase

java API操作经过Kerberos安全认证的HBase按照如下步骤实现即可。

1) 准备krb5.conf文件

将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。

2) 生成用户keytab文件

在kerberos服务端node1节点上,执行如下命令,对zhangsan用户主体生成keytab密钥文件。

#在node1 kerberos服务端执行
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /root/zhangsan.keytab zhangsan@EXAMPLE.COM"

以上命令执行之后,在/root目录下会生成zhangsan.keytab文件,将该文件复制到IDEA项目中的resources资源目录中或者本地window固定的某个目录中,该文件用于编写代码时认证kerberos。

3) 准备访问HDFS需要的资源文件

将HDFS中的core-site.xml 、hdfs-site.xml 、yarn-site.xml、hbase-site.xml文件上传到项目resources资源目录中。

4) 编写代码操作HBase

项目代码中引入如下maven依赖:

<!-- hbase依赖包 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
  <version>2.2.6</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>2.2.6</version>
</dependency>

编写Java代码操作Kerberos认证的HBase代码如下:

/**
 * Java API 操作Kerberos 认证的HBase
 */
public class OperateAuthHBase {

    private static Configuration conf;
    private static Connection connection;
    private static Admin admin;

    /**
     * 初始化:进行kerberos认证并设置hbase configuration
     */
    static {
        try {
            String principal = "zhangsan@EXAMPLE.COM";
            String keytabPath = "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\zhangsan.keytab";

            conf = HBaseConfiguration.create();
            // 设置Kerberos 认证
            System.setProperty("java.security.krb5.conf", "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\krb5.conf");
            conf.set("hadoop.security.authentication", "kerberos");
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytabPath);

            //设置zookeeper集群地址及端口号
            conf.set("hbase.zookeeper.quorum","node3,node4,node5");
            conf.set("hbase.zookeeper.property.clientPort","2181");

            //创建连接
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    public static void main(String[] args) throws IOException {
        try{
            //获取Admin对象
            admin = connection.getAdmin();

            //创建表
            createHBaseTable("tbl1", "cf1");
            System.out.println("===========================");

            //查看Hbase中所有表
            listHBaseTables();
            System.out.println("===========================");

            //向表中插入数据
            putTableData("tbl1","row1", "cf1", "id", "100");
            putTableData("tbl1","row1", "cf1", "name", "zs");
            putTableData("tbl1","row1", "cf1", "age", "18");

            putTableData("tbl1","row2", "cf1", "id", "200");
            putTableData("tbl1","row2", "cf1", "name", "ls");
            putTableData("tbl1","row2", "cf1", "age", "19");
            System.out.println("===========================");

            //查询表中的数据
            scanTableData("tbl1");
            System.out.println("===========================");

            //查询指定row数据
            getRowData("tbl1", "row1","cf1","name");
            System.out.println("===========================");

            //删除指定row数据
            deleteRowData("tbl1", "row1","cf1","name");
            System.out.println("===========================");

            //查询表中的数据
            scanTableData("tbl1");
            System.out.println("===========================");

            //删除表
            deleteHBaseTable("tbl1");

            //查看Hbase中所有表
            listHBaseTables();
            System.out.println("===========================");
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }

    }

    //判断表是否存在
    private static boolean isTableExist(String tableName) {
        try {
            return admin.tableExists(TableName.valueOf(tableName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    private static void createHBaseTable(String tableName, String... cfs) {
        if(isTableExist(tableName)) {
            System.out.println("表已经存在!");
            return;
        }
        if(cfs == null || cfs.length == 0){
            System.out.println("列族不能为空!");
            return;
        }
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
        List<ColumnFamilyDescriptor> cFDBList = new ArrayList<>();
        for (String cf : cfs) {
            cFDBList.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
        }
        tableDescriptorBuilder.setColumnFamilies(cFDBList);
        try {
            admin.createTable(tableDescriptorBuilder.build());
            System.out.println("创建表" + tableName + "成功!");
        } catch (IOException e) {
            System.out.println("创建失败!");
            e.printStackTrace();
        }

    }

    //查看Hbase中所有表
    private static void listHBaseTables() {
        try {
            TableName[] tableNames = admin.listTableNames();
            System.out.println("打印所有命名空间表名:");
            for (TableName tableName : tableNames) {
                System.out.println(tableName);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //向表中插入数据
    private static void putTableData(String tableName, String rowKey, String cf, String column, String value) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Put p = new Put(Bytes.toBytes(rowKey));
            p.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
            table.put(p);
            System.out.println("插入数据成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }


    }

    //查询表中的数据
    private static void scanTableData(String tableName) {
        if(tableName == null || tableName.length() == 0) {
            System.out.println("请正确输入表名!");
            return;
        }
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();

            ResultScanner resultScanner = table.getScanner(scan);
            Iterator<Result> iterator = resultScanner.iterator();
            while(iterator.hasNext()) {
                Result result = iterator.next();
                Cell[] cells = result.rawCells();
                for(Cell cell : cells) {
                    //得到rowkey
                    System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
                    //得到列族
                    System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
                    System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
                    System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }
    }

    //查询指定row数据
    private static void getRowData(String tableName, String rowkey,String colFamily, String col) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Get g = new Get(Bytes.toBytes(rowkey));
            // 获取指定列族数据
            if(col == null && colFamily != null) {
                g.addFamily(Bytes.toBytes(colFamily));
            } else if(col != null && colFamily != null) {
                // 获取指定列数据
                g.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
            }
            Result result = table.get(g);
            System.out.println("查询指定rowkey 数据如下");

            for(Cell cell : result.rawCells()) {
                //得到rowkey
                System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
                //得到列族
                System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }

    }

    //删除指定row数据
    private static void deleteRowData(String tableName , String rowKey, String colFamily, String ... cols) {
        if(tableName == null || tableName.length() == 0) {
            System.out.println("请正确输入表名!");
            return;
        }
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Delete del = new Delete(Bytes.toBytes(rowKey));
            // 删除指定列族
            del.addFamily(Bytes.toBytes(colFamily));
            // 删除指定列
            for (String col : cols) {
                del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
            }
            table.delete(del);
            System.out.println("删除数据成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }

    }

    //删除表
    private static void deleteHBaseTable(String tableName) {
        try {
            if(!isTableExist(tableName)) {
                System.out.println("要删除的表不存在!");
                return;
            }
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //关闭表
    private static void closeTable(Table table) {
        if(table != null) {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!文章来源地址https://www.toymoban.com/news/detail-725174.html

到了这里,关于Kerberos安全认证-连载11-HBase Kerberos安全配置及访问的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka配置Kerberos安全认证及与Java程序集成

    本文主要介绍在 Kafka 中如何配置 Kerberos 认证,以及 java 使用 JAAS 来进行 Kerberos 认证连接。 本文演示为单机版。 查看 Kerberos 版本命令: klist -V 软件名称 版本 jdk 1.8.0_202 kafka 2.12-2.2.1 kerberos 1.15.1 Kerberos 是一种由 MIT(麻省理工大学)提出的网络身份验证协议,它旨在通过使用密

    2024年01月22日
    浏览(61)
  • 企业级大数据安全架构(十)DBeaver连接Hive的Kerberos认证配置

    1.配置本地hosts 因为Kerberos认证过程及集群服务中,很多是以主机名的形式进行访问的,所以工作机要设置hosts. 域名映射,我们通过部署CDH的集群的每一台机器都已经配置了host(文件为/etc/hosts),工作机也需要配置window的host文件,如果提示无法修改,一般是需要管理员权限的原

    2024年02月21日
    浏览(43)
  • 【网络安全】图解 Kerberos:身份认证

    Kerberos 是一种身份认证协议,被广泛运用在大数据生态中,甚至可以说是大数据身份认证的事实标准。本文将详细说明 Kerberos 原理。 Kerberos 一词来源于古希腊神话中的 Cerberus —— 守护地狱之门的三头犬。下图是 Kerberos 的 LOGO。 一句话来说,Kerberos 是一种基于加密 Ticket 的身

    2024年02月06日
    浏览(53)
  • CDH数仓项目(三) —— Kerberos安全认证和Sentry权限管理

    本文基于《CDH数仓项目(一) —— CDH安装部署搭建详细流程》和《CDH数仓项目(二) —— 用户行为数仓和业务数仓搭建》和搭建CDH数仓。本章节主要介绍基于CDH数仓的Kerberos认证和Sentry权限管理 Kerberos是一种计算机网络授权协议,用来在非安全网络中,对个人通信以安全的手段进

    2023年04月22日
    浏览(53)
  • 实施PCIDSS认证:确保您的访问控制和配置在安全环境中运行

    作者:禅与计算机程序设计艺术 作为人工智能专家,作为一名CTO,我将为您介绍如何实施PCI DSS认证以确保您的访问控制和配置在安全环境中运行。本文将深入探讨技术原理、实现步骤以及优化与改进等方面,帮助您掌握实施PCI DSS认证的最佳实践。 引言 随着互联网的快速发

    2024年02月16日
    浏览(49)
  • 【物联网无线通信技术】802.11无线安全认证

    本文由简入繁介绍了IEEE802.11i无线局域网安全技术的前世今生,帮助路由器开发者对WLAN的加密安全策略有一个概念上的认知,能够更好地分析STA掉线以及漫游等问题。 目录 WEP WPA WPA/WPA2-PSK认证过程 802.11i WEP是Wired Equivalent Privacy的简称,有线等效保密(WEP)协议对在两台设备间

    2024年02月11日
    浏览(49)
  • 『Nginx安全访问控制』利用Nginx实现账号密码认证登录的最佳实践

    📣读完这篇文章里你能收获到 如何创建用户账号和密码文件,并生成加密密码 配置Nginx的认证模块,实现基于账号密码的登录验证 在Web应用程序的开发中,安全性是一项至关重要的任务。当用户需要访问敏感信息或执行特定操作时,需要使用账号和密码进行身份验证。本文

    2024年02月03日
    浏览(54)
  • SpringBoot整合自签名SSL证书,转变HTTPS安全访问(单向认证服务端)

    HTTP 具有相当优秀和方便的一面,然而 HTTP 并非只有好的一面,事物皆具两面性,它也是有不足之处的。例如: 通信使用明文(不加密),内容可能会被窃听。 不验证通信方的身份,因此有可能会遭遇伪装。 无法证明报文的完整性,所以有可能会遭篡改等。 因HTTP有存在通信

    2024年02月06日
    浏览(59)
  • 多因素认证与身份验证:分析不同类型的多因素认证方法,介绍如何在访问控制中使用身份验证以增强安全性

    随着数字化时代的到来,信息安全问题变得愈发重要。在网络世界中,用户的身份往往是保护敏感数据和系统免受未经授权访问的第一道防线。单一的密码已经不再足够,多因素认证(MFA)应运而生,成为提升身份验证安全性的重要工具之一。本文将深入探讨不同类型的多因

    2024年02月10日
    浏览(50)
  • kafka配置SASL/PLAIN 安全认证

    为zookeeper添加SASL支持,在配置文件zoo.cfg添加 新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在 /opt/zookeeper/conf/home 路径下。zk_server_jaas.conf文件的内容如下 Server { org.apache.kafka.common.security.plain.Plai

    2024年02月10日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包