数据血缘Atlas Rest-API使用

这篇具有很好参考价值的文章主要介绍了数据血缘Atlas Rest-API使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目场景

atlas支持对hive元数据的管理,通过执行bin/import-hive.sh脚本即可,但目前大多数离线平台是用spark分析数据的,而spark元数据atlas解析不出来数据血缘,这就需要我们自己通过解析spark执行计划再结合atlas rest-api组建出来我们的数据血缘,接下来和大家分享一下atlas rest-api使用方法。

依赖引入

<!-- Atlas2.0   -->
    <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-client-v2</artifactId>
      <version>2.0.0</version>
      <exclusions>
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>slf4j-log4j12</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
          <artifactId>log4j</artifactId>
          <groupId>log4j</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-client-common</artifactId>
      <version>2.0.0</version>
      <exclusions>
        <exclusion>
          <artifactId>slf4j-log4j12</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
      </exclusions>
    </dependency>

1.Type

1.1 概述

Type即元数据类型定义,这里可以是数据库、表、列等,还可以细分spark表(spark_table),hive表(hive_table)等,atlas自带了很多类型,如DataSet,Process等,一般情况下,数据相关的类型在定义类型的时候都会继承DataSet,而流程相关的类型则会继承Process,便于生成血缘关系。

注:Atlas管理的对象就是各种Type的Entity,因此先创建好Type再创建Entity,Type创建一次即可。

1.2 类型构建

1.Atlas自带hive相关类型如下:
数据血缘Atlas Rest-API使用

2.流程相关类型创建举例:

// 定义父类
val superType: Set[String] = Set(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS) // 流程相关的类型继承Process(表血缘、字段血缘)

// 类型创建对象
val typesDef = new AtlasTypesDef()
// 类型定义
val columnLineageType = new AtlasEntityDef() // 类型对象
columnLineageType.setName("spark_column_lineage") // 类型名
columnLineageType.setSuperTypes(superType.asJava) // 父类
columnLineageType.setServiceType("spark")
columnLineageType.setTypeVersion(1.0) // 版本号

val typeList = List(columnLineageType) // 将类型放到集合里
typesDef.setEntityDefs(typeList.asJava) // 赋值

// 构建查询过滤条件
val searchFilter = new SearchFilter()
searchFilter.setParam("name","spark_column_lineage")

// 如果该类型不存在则创建
val allTypeDefs: AtlasTypesDef = atlasClientV2.getAllTypeDefs(searchFilter)
if (allTypeDefs.getEntityDefs.isEmpty){
  // 类型创建
  atlasClientV2.createAtlasTypeDefs(typesDef)
}

说明:atlas自带spark_db/spark_table/spark_column/spark_process类型(旧版本中spark类型可能不完善),这里创建spark_column_lineage字段血缘类型作为参考。

3.数据相关类型创建举例(类型间存在关系如:db/table)
注:本次示例我们定义一个spark_db类型、spark_table类型,并且让spark_db一对多 spark_table

// 定义父类
val superType: Set[String] = Set(AtlasBaseTypeDef.ATLAS_TYPE_DATASET) // 数据相关的类型继承DataSet(库、表、字段)

// 类型创建对象
val typesDef = new AtlasTypesDef()
// db
val dbType = new AtlasEntityDef() // 类型对象
dbType.setName("spark_db") // 类型名
dbType.setSuperTypes(superType.asJava) // 父类
dbType.setServiceType("spark")
dbType.setTypeVersion(1.0) // 版本号
// table
val tableType = new AtlasEntityDef() // 类型对象
tableType.setName("spark_table") // 类型名
tableType.setSuperTypes(superType.asJava) // 父类
tableType.setServiceType("spark")
tableType.setTypeVersion(1.0) // 版本号

val typeList = List(dbType,tableType) // 将类型放到集合里
typesDef.setEntityDefs(typeList.asJava) // 赋值

// db与table之间存在依赖关系(一对多),下面创建它们的关系使其联系起来
//定义relationshipDef
val relationshipDef1 = new AtlasRelationshipDef()
relationshipDef1.setName("table_db") // 关系名自定义
relationshipDef1.setServiceType("spark")
relationshipDef1.setTypeVersion("1.0")
/**
 * 关系类型:
 *  ASSOCIATION:关联关系,没有容器存在,1对1
 *  AGGREGATION:容器关系,1对多,而且彼此可以相互独立存在
 *  COMPOSITION:容器关系,1对多,但是容器中的实例不能脱离容器存在
 */
relationshipDef1.setRelationshipCategory(AtlasRelationshipDef.RelationshipCategory.AGGREGATION)
// 推导tag:NONE-不推导
relationshipDef1.setPropagateTags(AtlasRelationshipDef.PropagateTags.NONE)

//定义endDef1
val endDef1 = new AtlasRelationshipEndDef()
endDef1.setType("spark_talbe")
//表中关联的属性名称
endDef1.setName("db")
//代表这头是不是容器
endDef1.setIsContainer(false)
//cardinality:三种类型SINGLE(单值), LIST(多值可重复), SET(多值不重复)
endDef1.setCardinality(AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE)

relationshipDef1.setEndDef1(endDef1)

//定义endDef2
val endDef2 = new AtlasRelationshipEndDef()
endDef2.setType("spark_db")
//数据库关联的属性名称
endDef2.setName("tables")
endDef2.setIsContainer(true)
// db 包含 table,table不能重复,所以类型设置为 SET
endDef2.setCardinality(AtlasStructDef.AtlasAttributeDef.Cardinality.SET)

relationshipDef1.setEndDef2(endDef2)

//关系可能有多种,定义关系集合relationshipDefs
val relationshipDefs = List(relationshipDef1)

typesDef.setRelationshipDefs(relationshipDefs.asJava)

// 构建查询过滤条件
val searchFilter = new SearchFilter()
searchFilter.setParam("name","spark_db")

// 如果该类型不存在则创建
val allTypeDefs: AtlasTypesDef = atlasClientV2.getAllTypeDefs(searchFilter)
if (allTypeDefs.getEntityDefs.isEmpty){
// 类型创建
  atlasClientV2.createAtlasTypeDefs(typesDef)
}

执行完毕后,可前往 Atlas 主页查看,类型已创建成功:

数据血缘Atlas Rest-API使用

1.3 类型查询

// 构建查询过滤条件
val searchFilter = new SearchFilter()
searchFilter.setParam("name","spark_db")

// 查询
val allTypeDefs: AtlasTypesDef = atlasClientV2.getAllTypeDefs(searchFilter)

*注意*
创建实体时有必要提前查看一下类型,因为需要注意该类型下所有属性的“getIsOptional”值即是否可选,如果为true,创建该类型下的实体时可以忽略该属性,如果为false,创建实体时必须给该属性赋值,否则会创建失败。

数据血缘Atlas Rest-API使用

报错类似如下:

“Invalid instance creation/updation parameters passed : spark_column.type: mandatory attribute value missing in type spark_column”
// 传递的实例创建/更新参数无效:spark_column。Type: spark_column类型中缺失的必选属性值

例如:创建spark_column类型下的字段实体,如果没有给该实体的“type”属性赋值就会创建失败。

1.4 类型删除

val myProcess = new AtlasEntityDef()
myProcess.setName("spark_process")// 类型名

val myType = new AtlasTypesDef()
myType.setEntityDefs(Seq(myProcess).asJava)

atlasClientV2.deleteAtlasTypeDefs(myType)

说明:对类型下的实体进行删除,默认情况下atlas为标记删除(删除策略见配置文件atlas-application.properties下的atlas.DeleteHandlerV1.impl),从2.1.0版本才支持对已“标记删除”的实体进行清除,因此在2.1.0以前如果想彻底删除atlas的实体,需要在没有创建任何实体之前修改atlas删除策略为物理删除,否则实体标记删除对type依然存在引用,这样对type进行删除操作会失败,报“给定类型xxx有引用”的错误。

数据血缘Atlas Rest-API使用

数据血缘Atlas Rest-API使用

类型删除成功客户端返回的状态为204

数据血缘Atlas Rest-API使用


2.Entity

2.1 概述

Entity即实体,表示具体的元数据,Atlas管理的对象就是各种Type的Entity,如一个表是一个实体(Entity),一个字段也是一个实体等等。

数据血缘Atlas Rest-API使用

2.2 实体创建

注:本次示例我们创建ods和dwd数据库实体到spark_db下,创建ods_table和dwd_table表实体到spark_table下,并且定义dwd_table的数据来自ods_table,生成两个表实体的血缘关系依赖;创建amount(ods_table)和maxAmount(dwd_table)字段实体到spark_column下,并且定义maxAmount的数据来自amount,生成两个字段实体的血缘关系依赖。

// 数据库实体
val ods = new AtlasEntity()
ods.setTypeName("spark_db") // 实体类型
val odsAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"ods",
  "name"->"ods",
  "description"->"测试创建db-ods") // 实体属性配置
ods.setAttributes(odsAttributes.asJava)

val dwd = new AtlasEntity()
dwd.setTypeName("spark_db") // 实体类型
val dwdAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"dwd",
  "name"->"dwd",
  "description"->"测试创建db-dwd")// 实体属性配置
dwd.setAttributes(dwdAttributes.asJava)

// 表实体
val odsTable = new AtlasEntity()
odsTable.setTypeName("spark_table") // 实体类型
// 实体属性配置
val odsTableAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"ods.ods_table",
  "name"->"ods_table",
  "description"->"测试创建ods_table",
  "db"->new AtlasObjectId("spark_db","qualifiedName","ods"))// 指明该表所在的数据库
odsTable.setAttributes(odsTableAttributes.asJava)

val dwdTable = new AtlasEntity()
dwdTable.setTypeName("spark_table") // 实体类型
// 实体属性配置
val dwdTableAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"dwd.dwd_table",
  "name"->"dwd_table",
  "description"->"测试创建dwd_table",
  "db"->new AtlasObjectId("spark_db","qualifiedName","dwd"))// 指明该表所在的数据库
dwdTable.setAttributes(dwdTableAttributes.asJava)

// 表血缘实体
val process = new AtlasEntity()
process.setTypeName("spark_process") // 实体类型
// 实体属性配置
val processAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"dwd_table_etl",
  "name"->"dwd_table_etl",
  "description"->"dwd_table的数据来自ods_table",
  "inputs"->Array(new AtlasObjectId("spark_table","qualifiedName","ods.ods_table")),// 血缘输入
  "outputs"->Array(new AtlasObjectId("spark_table","qualifiedName","dwd.dwd_table")))// 血缘输出
process.setAttributes(processAttributes.asJava)

// 字段实体
val odsTableColumn = new AtlasEntity()
odsTableColumn.setTypeName("spark_column") // 实体类型
// 实体属性配置
val odsTableColumnAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"ods.ods_table.amount",
  "name"->"amount",
  AtlasConstant.ATTRIBUTE_COMMENT->"额度",
  "table"->new AtlasObjectId("spark_table","qualifiedName","ods.ods_table"))// 指明该字段所在的表
odsTableColumn.setAttributes(odsTableColumnAttributes.asJava)

val dwdTableColumn = new AtlasEntity()
dwdTableColumn.setTypeName("spark_column") // 实体类型
// 实体属性配置
val dwdTableColumnAttributes: Map[String, AnyRef] = Map(
  "qualifiedName"->"dwd.dwd_table.maxAmount",
  "name"->"maxAmount",
  AtlasConstant.ATTRIBUTE_COMMENT->"最大额度",
  "table"->new AtlasObjectId("spark_table","qualifiedName","dwd.dwd_table"))// 指明该字段所在的表
dwdTableColumn.setAttributes(dwdTableColumnAttributes.asJava)

// 字段血缘实体
val columnLineage = new AtlasEntity()
columnLineage.setTypeName("spark_column_lineage") // 实体类型
// 实体属性配置
val columnLineageAttributes: Map[String, AnyRef] = Map(
"qualifiedName"->"Query:ods.ods_table.amount->dwd.dwd_table.maxAmount",
  "name"->"Query:ods.ods_table.amount->dwd.dwd_table.maxAmount",
  AtlasConstant.ATTRIBUTE_COMMENT->"dwd.dwd_table.maxAmount的数据来自ods.ods_table.amount",
  "inputs"->Array(new AtlasObjectId("spark_column","qualifiedName","ods.ods_table.amount")),// 血缘输入
  "outputs"->Array(new AtlasObjectId("spark_column","qualifiedName","dwd.dwd_table.maxAmount")))// 血缘输出
columnLineage.setAttributes(columnLineageAttributes.asJava)

// 将所有实体放入seq集合中
val entities: Seq[AtlasEntity] = Seq(ods,odsTable,odsTableColumn,dwdTable,dwd,process,dwdTableColumn,columnLineage)

// 调用工具类的实体创建方法
AtlasUtils.createAllEntities(atlasClientV2,entities)

AtlasUtils.createAllEntities()

/**
 * 实体创建
 * @param atlasClientV2
 * @param atlasEntities
 */
def createAllEntities(atlasClientV2: AtlasClientV2,atlasEntities: Seq[AtlasEntity]): Unit = {
  val entitiesGroupMap: Map[String, Seq[AtlasEntity]] = atlasEntities.groupBy(_.getTypeName) // 所有实体按类型名分组

// 逐层创建
  if (entitiesGroupMap.contains("spark_db")){
    atlasClientV2.createEntities(new AtlasEntity.AtlasEntitiesWithExtInfo(entitiesGroupMap.get("spark_db").get.asJava))
  }

  if (entitiesGroupMap.contains("spark_table")){
    atlasClientV2.createEntities(new AtlasEntity.AtlasEntitiesWithExtInfo(entitiesGroupMap.get("spark_table").get.asJava))
  }

   if (entitiesGroupMap.contains("spark_process")){
    atlasClientV2.createEntities(new AtlasEntity.AtlasEntitiesWithExtInfo(entitiesGroupMap.get("spark_process").get.asJava))
  }

  if (entitiesGroupMap.contains("spark_column")){
    atlasClientV2.createEntities(new AtlasEntity.AtlasEntitiesWithExtInfo(entitiesGroupMap.get("spark_column").get.asJava))
  }

  if (entitiesGroupMap.contains("spark_column_lineage")){
    atlasClientV2.createEntities(new AtlasEntity.AtlasEntitiesWithExtInfo(entitiesGroupMap.get("spark_column_lineage").get.asJava))
  }
}

执行以上代码,然后打开主页,点击spark_table中的ods_table,查看lineage标签,表血缘关系已成功构建:

数据血缘Atlas Rest-API使用

点击spark_column中的amount,查看lineage标签,字段血缘关系已成功构建:

数据血缘Atlas Rest-API使用

2.3 实体删除

/**
   * 实体删除
   * @param atlasClientV2
   * @param typeName 实体类型
   * @param qualifiedName
   */
def deleteEntity(atlasClientV2: AtlasClientV2,typeName: String,qualifiedName: String): Unit = {
    val attributes: Map[String, String] = Map("qualifiedName" -> qualifiedName)
    atlasClientV2.deleteEntityByAttribute(typeName,attributes.asJava)
  }

说明:该方式通过指明实体类型及qualifiedName,还可通过guid等进行删除

2.4 实体查询

// 构建查询条件
val queryAttributes: Map[String, String] = Map("qualifiedName" -> "ods")
// 第一个参数为查询的实体类型
Val extInfo = atlasClientV2.getEntityByAttribute("spark_db", queryAttributes)

2.5 实体的独立性

在atlas中实体是独立的,因此当有业务变更涉及增删字段时,删除某字段实体会在相应表实体columns属性中移除,但表实体的Audits栏中并不会新增一条更新操作记录(可能是使用的版本存在bug);
当新增字段实体时,仅且需要创建该字段实体即可,无需重新建表,我们可以观察到新增该字段实体的同时,表实体的columns的属性中新增了该字段,Audits栏中新增了一条更新操作记录。
当创建atlas中已存在的实体时,如果该实体所有属性均未发生改变,那么在atlas中不会看到任何变化,实体的Audits栏中也不会新增一条创建或更新的操作记录;若有部分属性发生变化,则会对该实体进行更新,可以观察到发生变化的属性,而且Audits栏中新增了一条更新操作记录。

数据血缘Atlas Rest-API使用

数据血缘Atlas Rest-API使用

2.6 实体的依赖性

值得注意的是,无论是在创建或删除操作时都需要注意实体间的依赖关系:

数据血缘Atlas Rest-API使用

创建时务必从左到右(比如创建dwd_xxx_xxx表实体时需要提前创建好dwd实体,因为建表实体需要拿到库实体的的唯一标识,或是Guid或是AtlasObjectId,来确定该表所在的库是哪个也就是指明它们之间的关系,atlas中每个实体均带有唯一的Guid,创建时随机生成,获取不到会报下图所示的错误而失败,因此需要创建好上层再创建下层)。

数据血缘Atlas Rest-API使用

删除时顺序反之,如果先删除数据库实体,操作可以成功,不会影响下层的血缘,但会影响库与表之前定义好的关系,涉及到该数据库的表实体db属性也会连带删除,若将该数据库实体重新创建回来也不能恢复它与表之间的关系,表实体也需要重建。
数据血缘Atlas Rest-API使用
数据血缘Atlas Rest-API使用


3.Lineage

3.1概述

数据血缘,表示数据之间的传递关系,通过 Lineage 我们可以清晰的知道数据的从何而来又流向何处,中间经历了哪些操作,这样一旦数据出现问题,可以迅速追溯,定位是哪个环节出现错误。


4.Classification

4.1 概述

分类,通俗点就是给元数据打标签,分类是可以传递的,比如 A 视图是基于 A 表生成的,那么如果 A 表打上了 a 这个标签,A 视图也会自动打上 a 标签,这样的好处就是便于数据的追踪。


记得点赞收藏奥,关注不迷路~文章来源地址https://www.toymoban.com/news/detail-472557.html

到了这里,关于数据血缘Atlas Rest-API使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • (Rest风格API)Elasticsearch索引操作、映射配置、数据操作、查询操作

    1.请求方式:put 2.请求路径:索引库名 3.请求参数:json格式 number_of_shards 是指索引要做多少个分片,只能在创建索引时指定,后期无法修改。 number_of_replicas 是指每个分片有多少个副本,后期可以动态修改 什么是分片? ES中所存数据的文件块,也是数据的最小单元块。假如有

    2024年04月26日
    浏览(33)
  • Camunda 7.x 系列【10】使用 Rest API 运行流程实例

    有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 2.7.9 本系列Camunda 版本 7.19.0 源码地址:https://gitee.com/pearl-organization/camunda-study-demo

    2024年02月13日
    浏览(42)
  • 使用Django Rest Framework设计与实现用户注册API

    在现代Web应用开发中,RESTful API已成为前后端分离架构中的关键组件。Django Rest Framework (DRF) 是一款基于Django的优秀库,提供了丰富的工具和接口,极大地简化了RESTful API的设计与实现。本文将以用户注册功能为例,展示如何运用DRF构建一个完整的API端点,包括数据验证、模型

    2024年04月25日
    浏览(27)
  • 如何使用Python Flask和MySQL创建管理用户的REST API

    部分数据来源: ChatGPT  引言         在现代化的应用开发中,数据库是一个非常重要的组成部分。关系型数据库(例如:MySQL、PostgreSQL)在这方面尤其是很流行。Flask是一个Python的web框架,非常适合实现REST API。在这篇文章中,我们将介绍如何使用Python Flask和MySQL创建一个

    2024年02月08日
    浏览(53)
  • Azure Machine Learning - 使用 REST API 创建 Azure AI 搜索索引

    本文介绍如何使用 Azure AI 搜索 REST AP和用于发送和接收请求的 REST 客户端以交互方式构建请求。 关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研发经验、团队管理经验,同济本复旦硕,复旦机器人智能实验室成员,阿里云认证的资深架构师,项目管理

    2024年02月04日
    浏览(54)
  • REST API 详解

    REST(Representational State Transfer,表述性状态转移)是一种用于构建分布式系统的架构风格。REST API(Application Programming Interface,应用程序接口)是一种基于REST风格的网络API,通常用于Web服务中。REST API使用常见的HTTP方法(如GET、POST、PUT、DELETE等)与Web服务交互,通过URI(Uni

    2024年02月02日
    浏览(25)
  • 如何编写REST API

    编写REST API REST API规范 编写REST API,实际上就是编写处理HTTP请求的async函数,不过,REST请求和普通的HTTP请求有几个特殊的地方: REST请求仍然是标准的HTTP请求,但是,除了GET请求外,POST、PUT等请求的body是JSON数据格式,请求的Content-Type为application/json; REST响应返回的结果是

    2023年04月20日
    浏览(26)
  • REST 与 RESTful API

    REST是什么 REST是万维网软件 架构风格 REST是一种网络应用程序的设计风格和开发方式,基于HTTP,可以使用 XML格式定义 或 JSON格式定义 。 REST适用于移动互联网厂商作为业务接口的场景,实现第三方OTT调用移动网络资源的功能,动作类型为新增、变更、删除所调用资源。 RES

    2024年02月06日
    浏览(31)
  • REST API的基础:HTTP

    在本文中,我们将深入探讨万维网数据通信的基础 - HTTP。 什么是超文本? HTTP(超文本传输协议)的命名源于“超文本”。 那么,什么是超文本? 想象一下由超链接组成的文本、图像和视频的混合物。这些链接充当我们从一个超文本集合跳转到另一个集合的门户。HTML(超文

    2024年02月15日
    浏览(27)
  • KepwareEX配置API REST接口

    API允许连接设置 请求地址(POST): https://主机名_或_ip:端口/config/v1/project/channels 以下示例使用postman工具访问API创建了一个名为Channel1 的通道,其使用在本地主机运行的服务器中的Simulator 驱动程序。 请求方式改为GET可查看所有通道信息:

    2024年02月14日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包