4 Paimon数据湖之Hive Catalog的使用

这篇具有很好参考价值的文章主要介绍了4 Paimon数据湖之Hive Catalog的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html文章来源地址https://www.toymoban.com/news/detail-783205.html

Paimon提供了两种类型的Catalog:Filesystem CatalogHive Catalog

  • Filesystem Catalog:会把元数据信息存储到文件系统里面。
  • Hive Catalog:则会把元数据信息存储到Hive的Metastore里面,这样就可以直接在Hive中访问Paimon表了。注意:此时也会同时在文件系统中存储一份元数据信息,相当于元数据会存储两份,这个大家需要特别注意一下。

还有就是我们在使用Hive Catalog的时候,Paimon中的数据库名称、表名称,以及字段名称都要小写,因为这些数据存储到Hive Metastore的时候,会统一存储为小写。

下面我们来具体演示一下Paimon如何使用Hive Catalog来存储元数据。

在Flink中操作Paimon的时候想要使用Hive Catalog,需要依赖于Flink Hive connector,以及hive-execflink-table-api-scala-bridge

flink-table-api-scala-bridge这个依赖我们之前已经添加过了,所以只需要添加另外两个即可:

<!-- flink-hive-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.15.0</version>
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
        </exclusion>
    </exclusions>
    <!--<scope>provided</scope>-->
</dependency>

创建package:tech.xuwei.paimon.catalog
创建object:PaimonHiveCatalog

代码如下:

package tech.xuwei.paimon.catalog

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * Paimon使用Hive Catalog
 * Created by xuwei
 */
object PaimonHiveCatalog {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog--使用Hive Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_hive_catalog WITH(
        |    'type'='paimon',
        |    'metastore' = 'hive',
        |    'uri' = 'thrift://bigdata04:9083',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_hive_catalog")

    //创建Paimon表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS p_h_t1(
        |    name STRING,
        |    age INT,
        |    PRIMARY KEY (name) NOT ENFORCED
        |)
        |""".stripMargin)

    //向表中插入数据
    tEnv.executeSql(
      """
        |INSERT INTO p_h_t1(name,age) VALUES('jack',18),('tom',20)
        |""".stripMargin)

  }

}

接下来到bigdata04节点上启动hive的metastore服务。

[root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[root@bigdata04 apache-hive-3.1.2-bin]# nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &

然后运行代码PaimonHiveCatalog

代码运行之后可以到先到hdfs中确认一下是否能看到元数据信息:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/p_h_t1/schema/schema-0
{
  "id" : 0,
  "fields" : [ {
    "id" : 0,
    "name" : "name",
    "type" : "STRING NOT NULL"
  }, {
    "id" : 1,
    "name" : "age",
    "type" : "INT"
  } ],
  "highestFieldId" : 1,
  "partitionKeys" : [ ],
  "primaryKeys" : [ "name" ],
  "options" : { }

可以发现,在hdfs中依然是可以看到的,因为我们前面说了,使用hive catalog时也会同时在hdfs中存储一份元数据。

最后我们到hive中确认一下:
注意:由于目前bigdata04节点的环境变量中有HADOOP_CLASSPATH,所以直接使用hive客户端会看到很多日志信息,所以建议使用hive的beeline客户端。
此时需要先启动hiveserver2服务。

[root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[root@bigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2

使用beeline客户端进行连接

[root@bigdata04 apache-hive-3.1.2-bin]# bin/beeline -u  jdbc:hive2://localhost:10000 -n root
0: jdbc:hive2://localhost:10000> show tables;
+--------------------+
|      tab_name      |
+--------------------+
| flink_stu          |
| orders             |
| p_h_t1             |
| s1                 |
| student_favors     |
| student_favors_2   |
| student_score      |
| student_score_bak  |
| t1                 |
+--------------------+
9 rows selected (1.727 seconds)
0: jdbc:hive2://localhost:10000> select * from p_h_t1;
Error: Error while compiling statement: FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.paimon.hive.mapred.PaimonInputFormat (state=42000,code=40000)

此时是可以在hive中查看到p_h_t1这个表的,但是在操作这个表的时候会报错,提示缺少依赖,现在报这个错是正常的,等后面我们会有一个单独的小节来讲Paimon和Hive引擎的集成。
目前通过hive catalog可以将paimon的元数据同时存储到hive的metastore中,但是还无法在hive中操作paimon的表,其实主要是因为缺少一个依赖,在这大家先知道这个问题即可。

注意:如果我们此时操作的是分区表,那么分区信息默认是无法同步到Hive Metastore的。

也就是说默认情况下,Paimon不会将新创建的分区同步到Hive Metastore中。我们在Hive中只能看到一个未分区的普通表。

如果想解决这个问题,也很简单,只需要在paimon的表属性中设置metastore.partitioned-table=true即可。

下面开发一个案例:
创建object:PaimonHiveCatalogPartitionTable,基于PaimonHiveCatalog进行复制。

完整代码如下:

package tech.xuwei.paimon.catalog

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * Paimon使用Hive Catalog
 * 操作分区表
 * Created by xuwei
 */
object PaimonHiveCatalogPartitionTable {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog--使用Hive Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_hive_catalog WITH(
        |    'type'='paimon',
        |    'metastore' = 'hive',
        |    'uri' = 'thrift://bigdata04:9083',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_hive_catalog")

    //创建Paimon表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS p_h_par(
        |    id INT,
        |    name STRING,
        |    dt STRING,
        |    PRIMARY KEY (id, dt) NOT ENFORCED
        |) PARTITIONED BY (dt) WITH(
        |    'metastore.partitioned-table' = 'true'
        |)
        |""".stripMargin)

    //向表中插入数据
    tEnv.executeSql(
      """
        |INSERT INTO p_h_par(id,name,dt)
        |VALUES(1,'jack','20230101'),(2,'tom','20230102')
        |""".stripMargin)

  }

}

在idea中执行代码。

然后到hive中进行验证,可以执行show partitions p_h_par;进行验证。

或者到hive metastore里面进行确认,查看mysql中的partitions表,这个表里面存储的是分区信息,如果能看到分区信息,就说明Paimon表的分区信息同步过来了。
paimon数据存储在哪里,paimon,paimon,数据湖,hive catalog,大数据,数据仓库

这样就说明Paimon表的分区信息同步过来了。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

到了这里,关于4 Paimon数据湖之Hive Catalog的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 聊聊流式数据湖Paimon(二)

    Apache Paimon 最典型的场景是解决了 CDC (Change Data Capture) 数据的入湖;CDC 数据来自数据库。一般来说,分析需求是不会直接查询数据库的。 容易对业务造成影响,一般分析需求会查询全表,这可能导致数据库负载过高,影响业务 分析性能不太好,业务数据库一般不是列存,查

    2024年02月03日
    浏览(29)
  • 聊聊流式数据湖Paimon(四)

    数据打宽 通过不同的流写不同的字段,打宽了数据的维度,填充了数据内容;如下所示: 完整的Changlog Paimon中的表被多流填充数据且打宽维度后,支持流读、批读的方式提供完整的Changelog给下游。 Sequence-Group 配置: \\\'fields.G.sequence-group\\\'=\\\'A,B\\\' 由字段 G 控制是否更新字段 A, B ;

    2024年02月03日
    浏览(62)
  • 【大数据】-- 创建 Paimon 外部表

           如今,在数据湖三剑客(delta lake、hudi、iceberg)之上,又新出一派: apache paimon。我们恰好在工作中遇到,以下介绍在 dataworks 上,使用 maxcompute odps sql 创建 apache paimon 外部表的一些操作和注意事项。参考:创建MaxCompute Paimon外部表_云原生大数据计算服务 MaxCompute(Max

    2024年03月13日
    浏览(37)
  • 聊聊流式数据湖Paimon(五)

    从Demo入手,了解Paimon/Flink项目搭建的全过程。记录下采坑之旅。 在IDEA中创建Flink项目,由于没有Flink的archetype,因此需要手动创建一下。 参考:idea快速创建flink项目,至此Flink的项目框架就搭建起来了。 注意:必须注释掉pom文件中的 provided ;否则运行时会报错: Error: A JN

    2024年02月03日
    浏览(41)
  • 聊聊流式数据湖Paimon(三)

    如果表没有定义主键,则默认情况下它是仅追加 表类型(Append Only Table)。 根据桶(Bucket)的定义,我们有两种不同的仅追加模式:\\\"Append For Scalable Table\\\"和\\\"Append For Queue\\\";两种模式支持不同的场景,提供不同的功能。 只能向表中插入一条完整的记录。 不支持删除或更新,并且不能

    2024年02月03日
    浏览(48)
  • Apache Paimon 使用之文件系统配置

    1)概述 Paimon 和 Flink 一样使用了插件式的 file systems,如果使用Flink引擎,用户可以根据 plugin 机制配置 plugin 结构。 然而,对于 Spark 引擎 和 Hive 引擎,它们提供的 Jars 可能与 Flink 提供的产生冲突,不能直接使用,因此 Paimon 自己也提供了FileSystem插件,供用户从Spark或Hive端查

    2024年03月12日
    浏览(52)
  • Apache Paimon使用之 Altering Tables

    1.改变或增加表属性 设置表属性 write-buffer-size 为 256 MB 。 Flink 引擎 Spark3引擎 2.重命名表 Flink引擎 Spark3引擎 最简单的sql调用是 可以以这种方式重命名paimon表 不能将Catalog名称放在重命名的表之前,会报错: 注意 :如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象

    2024年03月12日
    浏览(34)
  • Flink + Paimon数据 CDC 入湖最佳实践

    Apache Paimon 最典型的场景是解决了 CDC (Change Data Capture)数据的入湖,看完这篇文章可以了解到: 1、为什么 CDC 入Hive迁移到 Paimon? 2、CDC 入 Paimon 怎么样做到成本最低? 3、Paimon 对比 Hudi有什么样的优势?  Paimon 从 CDC 入湖场景出发,希望提供给你 简单、低成本、低延时 的

    2024年01月16日
    浏览(44)
  • Paimon+StarRocks 湖仓一体数据分析方案

    摘要:本文整理自阿里云高级开发工程师曾庆栋(曦乐)在 Streaming Lakehouse Meetup 的分享。内容主要分为四个部分: 传统数据仓库分析实现方案简介 Paimon+StarRocks 构建湖仓一体数据分析实现方案 StarRocks 与 Paimon 结合的使用方式与实现原理 StarRocks 社区湖仓分析未来规划 点击查

    2024年02月10日
    浏览(42)
  • 流数据湖平台Apache Paimon(二)集成 Flink 引擎

    Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。 环境准备 2.1.1 安装 Flink 1)上传并解压Flink安装包 tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/ 2)配置环境变量 2.1.2 上传 jar 包 1)下载并上传Paimon的jar包 jar包下载地址:https://repository.apache.org/snapshots/org/apache/pa

    2024年02月09日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包