HBase Shell操作&Flink写入HBase

这篇具有很好参考价值的文章主要介绍了HBase Shell操作&Flink写入HBase。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、HBase Shell操作

1、基本操作

1)进入HBase客户端命令行

    [root@bigdata1 hbase]$ bin/hbase shell

2)查看帮助命令

    hbase(main):001:0> help

3)查看当前数据库中有哪些表

    hbase(main):002:0> list

2、表的操作

1)创建表

    hbase(main):002:0> create 'student','info'

2)插入数据到表

    hbase(main):003:0> put 'student','1001','info:sex','male'
    hbase(main):004:0> put 'student','1001','info:age','18'
    hbase(main):005:0> put 'student','1002','info:name','Janna'
    hbase(main):006:0> put 'student','1002','info:sex','female'
    hbase(main):007:0> put 'student','1002','info:age','20'

3)扫描查看表数据

    hbase(main):008:0> scan 'student'
    hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW  => '1001'}
    hbase(main):010:0> scan 'student',{STARTROW => '1001'}

4)查看表结构

    hbase(main):011:0> describe 'student'

5)更新指定字段的数据

    hbase(main):012:0> put 'student','1001','info:name','Nick'
    hbase(main):013:0> put 'student','1001','info:age','100'

6)查看“指定行”或“指定列族:列”的数据

    hbase(main):014:0> get 'student','1001'
    hbase(main):015:0> get 'student','1001','info:name'

7)统计表数据行数

    hbase(main):021:0> count 'student'

8)删除数据

    删除某rowkey的全部数据:
    hbase(main):016:0> deleteall 'student','1001'
    删除某rowkey的某一列数据:
    hbase(main):017:0> delete 'student','1002','info:sex'

9)清空表数据

    hbase(main):018:0> truncate 'student'
    提示:清空表的操作顺序为先disable,然后再truncate。

10)删除表

    首先需要先让该表为disable状态:
    hbase(main):019:0> disable 'student'
    然后才能drop这个表:
    hbase(main):020:0> drop 'student'
    提示:如果直接drop表,会报错:ERROR: Table student is enabled. Disable it first.

11)变更表信息

    将info列族中的数据存放3个版本:
    hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
    hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}

二、Flink整合HBase写入操作

现在需要将Flink处理的数据存入HBase数据库(namespace)shtd_result的order_info表中,rowkey为id的值,然后在Linux的HBase shell命令行中查询列consignee,并查询出任意5条

表空间为:shtd_result,表为order_info,列族为:info
表结构为:

字段 类型 注释
rowkey string HBase的主键,值为id
id bigint
consignee string
consignee_tel string
final_total_amount double
order_status string
user_id bigint
delivery_address string
order_comment string
out_trade_no string
trade_body string
create_time string 转成yyyy-MM-dd hh:mm:ss格式的的字符串
operate_time string 转成yyyy-MM-dd hh:mm:ss格式的的字符串
expire_time string 转成yyyy-MM-dd hh:mm:ss格式的的字符串
tracking_no string
parent_order_id bigint
img_url string
province_id int
benefit_reduce_amount double

我们需要写一个WriteToHBase类,集成自RichSinkFunction,RichSinkFunction 是一个抽象类,提供了一个更为丰富的接口,用于实现自定义的 Sink(接收器)功能。

在Scala api中RichSinkFunction的主要方法有open,invoke以及close。

  • open(Configuration parameters):

    这个方法在 Sink 函数初始化时被调用,通常用于一次性的设置工作,例如打开数据库连接或初始化状态。
    参数 parameters 提供了访问 Flink 配置的能力。

  • invoke(value: T, context: SinkFunction.Context):

    这是核心方法,用于处理每条流入的数据。
    value 参数代表当前的数据元素。
    context 提供了此元素的上下文信息,如当前的处理时间或事件时间。

  • close():

    当 Sink 不再接收数据时调用此方法,用于执行清理工作,如关闭数据库连接。
    这个方法是在最后一次调用 invoke 方法后执行。

了解了这些方法后,我们来写一下WriteToHBase

一、WriteToHBase的实现

  class WriteToHBase extends RichSinkFunction[OrderData] {
    @transient private var connection: Connection = _
    @transient private var table: Table = _

    override def open(parameters: Configuration): Unit = {
      val config = HBaseConfiguration.create()
      // 设置HBase配置, 如Zookeeper地址等
       config.set("hbase.zookeeper.quorum", "bigdata1:2181")

      connection = ConnectionFactory.createConnection(config)
      table = connection.getTable(TableName.valueOf("shtd_result:order_info"))
    }

    override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
      // 将 id 转换为行键(假设 id 是唯一的)
      val rowKey = Bytes.toBytes(value.id.toString)

      // 为该行创建一个新的 Put 实例
      val put = new Put(rowKey)

      // 向 Put 实例中添加列
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee"), Bytes.toBytes(value.consignee))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee_tel"), Bytes.toBytes(value.consignee_tel))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("final_total_amount"), Bytes.toBytes(value.final_total_amount))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_status"), Bytes.toBytes(value.order_status))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(value.user_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delivery_address"), Bytes.toBytes(value.delivery_address))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_comment"), Bytes.toBytes(value.order_comment))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_trade_no"), Bytes.toBytes(value.out_trade_no))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("trade_body"), Bytes.toBytes(value.trade_body))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operate_time"), Bytes.toBytes(value.operate_time))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("expire_time"), Bytes.toBytes(value.expire_time))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tracking_no"), Bytes.toBytes(value.tracking_no))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parent_order_id"), Bytes.toBytes(value.parent_order_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("img_url"), Bytes.toBytes(value.img_url))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province_id"), Bytes.toBytes(value.province_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("benefit_reduce_amount"), Bytes.toBytes(value.benefit_reduce_amount))

      table.put(put)
    }

    override def close(): Unit = {
      if (table != null) {
        table.close()
      }
      if (connection != null) {
        connection.close()
      }
    }
  }

在 Scala 和 Java 中,@transient 关键字用于标记一个类的成员变量为“暂时的”(transient),这意味着这个变量不会被默认的序列化过程序列化。

在 Flink中,通常用于:

  • 防止序列化问题:当一个对象需要在不同的机器或上下文中传递时,某些属性可能不支持序列化(例如,数据库连接),或者序列化这些属性没有意义(例如,临时缓存)。使用 @transient 可以避免这些字段在对象序列化时引发错误。

  • 减少网络开销:对于不需要跨节点传输的字段,使用 @transient 可以减少不必要的网络传输开销。

在 WriteToHBase 类中,connection 和 table 作为 HBase 的连接和表实例,通常不支持序列化,也不应该被序列化。所以,它们被标记为 @transient。


上面的向 Put 实例中添加列过于冗长,可以用反射来代替:

def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  //获取运行时镜像和实例镜像
  val mirror = runtimeMirror(getClass.getClassLoader)
  val instanceMirror = mirror.reflect(data) 
  //获取类成员并过滤方法
  val members = typeOf[T].members.sorted.filterNot(_.isMethod)

  //遍历字段并添加到 Put 实例
  members.foreach { m =>
    val fieldMirror = instanceMirror.reflectField(m.asTerm)
    val name = m.name.toString.trim
    val value = fieldMirror.get.toString

    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  }
}

override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  val rowKey = Bytes.toBytes(value.id.toString)
  val put = new Put(rowKey)
  val infoCF = "info"

  // 使用反射自动添加列
  addColumnsUsingReflection(put, infoCF, value)

  table.put(put)
}

现在来解释一下这段代码:

addColumnsUsingReflection 函数定义

def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  • [T: TypeTag]:类型参数 T,带有一个上下文界定 TypeTag,这使得可以在运行时获取类型 T 的信息。
  • (put: Put, cf: String, data: T):函数接受三个参数:put 是 HBase 的 Put 实例,cf 是列族名,data 是要插入的数据对象。

获取运行时镜像和实例镜像

  val mirror = runtimeMirror(getClass.getClassLoader)
  val instanceMirror = mirror.reflect(data)
  • val mirror:创建一个 Mirror 实例,它是反射 API 的入口点。
  • runtimeMirror(getClass.getClassLoader):获取当前类的类加载器的运行时镜像。
  • val instanceMirror:反射 data 对象,得到一个可以用来访问 data 实例成员的 InstanceMirror

获取类成员并过滤方法

  val members = typeOf[T].members.sorted.filterNot(_.isMethod)
  • typeOf[T]:获取类型 T 的类型信息。
  • .members:获取类型 T 的所有成员(字段和方法)。
  • .sorted:对成员进行排序(默认按名称)。
  • .filterNot(_.isMethod):过滤掉方法成员,只保留字段。

遍历字段并添加到 Put 实例

  members.foreach { m =>
    val fieldMirror = instanceMirror.reflectField(m.asTerm)
    val name = m.name.toString.trim
    val value = fieldMirror.get.toString
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  }
  • 遍历所有字段。
  • val fieldMirror:反射每个字段,得到一个可以操作字段的 FieldMirror
  • m.asTerm:将成员 m 转换为一个 term(字段)。
  • val name:获取字段名,并去除首尾空格。
  • val value:获取字段的值并转换为字符串。
  • put.addColumn:向 Put 实例添加列,列族为 cf,列名为字段名 name,值为字段值 value

invoke 方法

override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  val rowKey = Bytes.toBytes(value.id.toString)
  val put = new Put(rowKey)
  val infoCF = "info"
  addColumnsUsingReflection(put, infoCF, value)
  table.put(put)
}
  • override def invoke:重写 RichSinkFunctioninvoke 方法。
  • val rowKey:将 OrderDataid 字段转换为字节作为行键。
  • val put:创建一个新的 Put 实例。
  • val infoCF:定义列族名。
  • addColumnsUsingReflection:调用之前定义的函数来动态添加列。
  • table.put(put):将 Put 实例写入 HBase 表。

这段代码通过反射自动化了向 HBase Put 实例添加数据的过程,避免了手动为每个字段编写重复代码的需要。

然后我们需要对于dataStream应用刚才写的 WriteToHBase 类

应用 WriteToHBase 类

dataStream.addSink(new WriteToHBase)

二、HBase Shell操作

1. 启动 HBase Shell

首先,我们需要进入 HBase Shell。在命令行中输入:

hbase shell

2. 创建命名空间

如果命名空间 shtd_result 还不存在,需要先创建它。在 HBase Shell 中执行以下命令:

create_namespace 'shtd_result'

3. 创建表

接着,创建表 order_info。我们需要定义至少一个列族(在这个示例中,我将使用 info 作为列族名)。在 HBase Shell 中执行以下命令:

create 'shtd_result:order_info', 'info'

这里,'shtd_result:order_info' 指定了完整的表名(包括命名空间),而 'info' 是列族名。

4. 验证表创建

最后,您可以列出所有表来验证新表是否已成功创建:

list

5. 查询

scan 'shtd_result:order_info', {COLUMNS => ['info:consignee'], LIMIT => 5}

这里的 scan 命令用于扫描 shtd_result:order_info 表,COLUMNS 参数指定我们只关心 info:consignee 列(假设 consignee 存储在名为 info 的列族中),而 LIMIT => 5 指定我们只查看 5 条记录。文章来源地址https://www.toymoban.com/news/detail-760505.html

到了这里,关于HBase Shell操作&Flink写入HBase的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hbase Shell操作

    HBase中用 create 命令创建表,具体如下: 此时,创建了一个“student”表,属性有:name,sex,age,address。可通过 describe 命令查看“student”表的基本信息: 2.1 添加数据 HBase中用 put 命令添加数据. 注意:一次只能为一个表的一行数据的一个列,也就是一个单元格添加一个数据。

    2024年02月15日
    浏览(44)
  • HBase基础及shell操作

    HBase是采用java语言编写的一款 apache 开源的基于HDFS的NoSQL型数据库,不支持 SQL,不支持事务,不支持Join操作,没有表关系 1.不支持事务 2.主要存储结构化数据以及半结构化的数据 3.​HBase中数据存储都是以 字节 的形式来存储的 4.HBase是易于扩展的 1- 大: 在一个表中可以存储上

    2023年04月08日
    浏览(41)
  • HBase Shell基本操作

    先在Linux Shell命令行终端执行 start-dfs.sh 脚本启动HDFS,再执行 start-hbase.sh 脚本启动HBase。如果Linux系统已配置HBase环境变量,可直接在任意目录下执行 hbase shell 脚本命令,就可进入HBase Shell的命令行终端环境, exit 可以退出HBase Shell(我安装的是伪分布式的HBase)。 (1) help帮

    2024年04月13日
    浏览(39)
  • HBase(11):shell管理操作

    1 status 例如:显示服务器状态   2 whoami 显示HBase当前用户,例如:   3 list 显示当前所有的表 4 count 统计指定表的记录数,例如:     5 describe 展示表结构信息   6 exists 检查表是否存在,适用于表量特别多的情况

    2024年02月12日
    浏览(42)
  • HBase高手之路4-Shell操作

    命令 功能 create 创建表 put 插入或者更新数据 get 获取限定行或者列的数据 scan 全表扫描或扫描表并返回表的数据 describe 查看表的结构 count 统计行数 delete 删除指定的行或列的数据 deleteall 删除整个行或者列的数据 truncate 删除表的数据,结构还在 drop 删除整个表(包括数据)

    2023年04月17日
    浏览(41)
  • Hbase安装和shell客户端操作

    HBase 是一个 面向列式存储的分布式数据库 ,其设计思想来源于 Google 的 BigTable 论文。 HBase 底层存储基于 HDFS 实现,集群的管理基于 ZooKeeper 实现。 HBase 良好的分布式架构设计为海量数据的快速存储、随机访问提供了可能,基于数据副本机制和分区机制可以轻松实现在线扩容

    2024年02月08日
    浏览(48)
  • 大数据----33.hbase中的shell文件操作

    HBase的命令行工具,最简单的接口,适合HBase管理使用,可以使用shell命令来查询HBase中数据的详细情况。 注意:如果进入hbase后长时间不操作; 发生hbase自动关闭没有了进程; 原因是内存不够;可以关闭机器;增加内存;虚拟机就扩大内存。 1、进入 hbase 客户端、帮助命令

    2024年01月24日
    浏览(42)
  • 第1关:HBase Shell 操作:分区压缩

    任务描述 本关任务:在 HBase Shell 中使用分区压缩命令并将查看到的命令结果复制到指定的文件中。 相关知识 为了完成本关任务,你需要掌握: 1.数据分区压缩的概念; 2.数据分区压缩的原因; 3.数据分区压缩的过程; 4.数据分区压缩的触发时机; 5.数据分区压缩的详解。 数据分

    2024年04月13日
    浏览(71)
  • HBase Shell 操作:自动拆分和预分区

    启动hadoop集群 start-all.sh 启动Zookeeper集群 zkServer.sh start 启动HBase start-hbase.sh 进入hbase shell hbase shell 创建的表使用自动拆分命令 create \\\'stu\\\',{METADATA={\\\'SPLIT_POLICY\\\'=\\\'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy\\\'}},{NAME=\\\'sc\\\'} 第二关:预分区 具体预分区配置要求如下所述: 文本文件

    2024年04月10日
    浏览(38)
  • HBase Shell启动缓慢及操作耗时长的原因分析与解决

    在内网搭了一个 hbase-2.2.6(hadoop-2.7.3)的环境,使用的是其内置的 zookeeper-3.4.10,16010端口对应的 web界面可以正常访问,且各项功能正常。 在使用 hbase shell的过程中,首先是 hbase shell启动非常慢,约 210s才成功,其次执行 scan、put、get等命令需要 20s左右才能完成。以笔者的经

    2024年02月02日
    浏览(79)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包