spark-sql字段血缘实现

这篇具有很好参考价值的文章主要介绍了spark-sql字段血缘实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

spark-sql字段血缘实现

背景

Apache Spark是一个开源的大数据处理框架,它提供了一种高效、易于使用的方式来处理大规模数据集。在Spark中,数据是通过DataFrame和Dataset的形式进行操作的,这些数据结构包含了一系列的字段(也称为列)。字段血缘是Spark中的一个关键概念,它帮助我们理解数据的来源和流向,从而更好地理解和控制数据处理过程。

字段血缘是指在数据处理过程中,一个字段的值是如何从源数据产生并传递给目标数据的。在Spark中,字段血缘是通过依赖关系进行管理的。每个字段都有一个或多个依赖关系,这些依赖关系定义了字段的值如何从其他字段或数据源产生。

前提

spark版本:2.4.3
使用语言:java+scala

技术实现

1. spark-sql的执行计划,了解如何实现字段血缘解析

spark-sql字段血缘实现,spark,spark,sql,大数据
一个sql会经历一些列的处理,最终生成spark-core的代码,提交到集群运行。
首先看一下一个简单的sql生成的逻辑执行计划长什么样子

insert into default.jy_test
select * from default.jy_test

未解析的逻辑执行计划:

'InsertIntoTable 'UnresolvedRelation `default`.`jy_test`, false, false
+- 'Project [*]
   +- 'UnresolvedRelation `default`.`jy_test`

解析后(analyzer)的逻辑执行计划:

InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- Project [id#0, name#1]
   +- SubqueryAlias `default`.`jy_test`
      +- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]

优化后(optimizer)的逻辑执行计划:

InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]

重点来了

  1. 所谓的逻辑执行计划就是一个树形结构
  2. 树形结构中的叶子结点,就是hive的表信息:库名、表名、字段信息,并且spark给每一个字段生成了一个唯一的id
  3. 树形结构中的非叶子只包含了字段信息,不包含库表信息

所以想要实现字段血缘,我们需要做的就是通过那个生成的唯一id去一层层的关联,当关联到叶子结点的时候,就找到了库名表名

2. 构建一颗与解析后的逻辑执行计划一模一样的树形结构

  1. 首先定义node对象,用来存放节点信息
public abstract class Node {
    private String name;
    private List<Column> columnList = new ArrayList<>();
    private List<Node> children = new ArrayList<>();
    private Node parentNode;
    private String graphId;
}
  1. 其次定义column对象,用来存放字段信息
public class Column {
   private String name;
   private Long exprId;
   private String ColumnType;
   private ArrayList<Column> child = new ArrayList<Column>();
   private String tableName;
   private String process;
}
  1. 根据spark-sql生成的逻辑执行计划,我们为每一个逻辑节点创建对应的结点,由于结点很多,我这里直接给个截图,源码会在文章最后提供出来
    spark-sql字段血缘实现,spark,spark,sql,大数据

  2. 解析spark-sql生成的解析后的逻辑执行计划
    首先获取逻辑执行计划,这里提供两种方式:
    1.通过spark-session获取,该方法可以用来做测试,非常的方便

     LogicalPlan logicalPlan = spark.sessionState().sqlParser().parsePlan(sql);
     LogicalPlan analyzer = spark.sessionState().analyzer().execute(logicalPlan);
    

    2.通过QueryExecution获得,这里贴个图,详情看源码
    spark-sql字段血缘实现,spark,spark,sql,大数据

  3. 解析spark生成的analyzer,构建我们自己的树形结构
    这里贴一下主要的逻辑,使用scala去递归解析抽象语法树会方便很多

def resolveLogicPlan(plan: LogicalPlan, root: Node): Unit = {
    plan match {
      case plan: InsertIntoHadoopFsRelationCommand =>
        val node = root.asInstanceOf[Root]
        node.setName(NodeType.INSERTINTOHIVETABLE.getName)
        val database: String = plan.catalogTable.get.identifier.database.getOrElse("default")
        val table: String = plan.catalogTable.get.identifier.table
        val fullTableName = database + "." + table
        plan.catalogTable.get.schema.foreach { field => {
          val column = new Column()
          column.setName(field.name)
          column.setTableName(fullTableName)
          node.getColumnList.add(column)
        }
        }
        resolveLogicPlan(plan.query, node)
      case plan: SaveIntoDataSourceCommand =>
        val table: String = plan.options.get("dbtable").getOrElse("")
        val url: String = plan.options.get("url").getOrElse("")
        val user: String = plan.options.get("user").getOrElse("")
        val password: String = plan.options.get("password").getOrElse("")
        // 定义匹配数据库名称的正则表达式模式
        val pattern: Regex = ".*://[^/]+/(\\w+)".r
        // 使用正则表达式进行匹配
        val dbNameOption: Option[String] = pattern.findFirstMatchIn(url).map(_.group(1))
        val fullTableName = dbNameOption.getOrElse("") + "." + table
        val node = root.asInstanceOf[Root]
        node.setName(NodeType.SAVEINTODATASOURCECOMMAND.getName)
        // 连接mysql,根据库名表明获取字段列表
        val fieldsList = getFieldsListFromMysql(url, user, password, table)
        fieldsList.foreach { field => {
          val column = new Column()
          column.setName(field)
          column.setTableName(fullTableName)
          node.getColumnList.add(column)
        }
        }
        resolveLogicPlan(plan.query, node)
      case plan: InsertIntoHiveTable =>
        val node = root.asInstanceOf[Root]
        node.setName(NodeType.INSERTINTOHIVETABLE.getName)
        val database: String = plan.table.identifier.database.getOrElse("default")
        val table: String = plan.table.identifier.table
        val fullTableName = database + "." + table

        node.setTableName(fullTableName)
        plan.table.schema.foreach { field => {
          val column = new Column()
          column.setName(field.name)
          column.setTableName(fullTableName)
          node.getColumnList.add(column)
        }
        }
        resolveLogicPlan(plan.query, node)
      case plan: Aggregate =>
        val node = new AggregateNode()
        insertNodeColumnsFromNamedExpression(node, plan.aggregateExpressions)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)

      case plan: Project =>
        val node = new ProjectNode()
        insertNodeColumnsFromNamedExpression(node, plan.projectList)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)
      case plan: LogicalRelation =>
        val node = new LogicalRelationNode()
        dfsLogicalRelation(plan, node)
        node.setParentNode(root)
        root.getChildren.add(node)

      case plan: HiveTableRelation =>
        val node = new LogicalRelationNode()
        dfsLogicalRelation(plan, node)
        node.setParentNode(root)
        root.getChildren.add(node)

      case plan: Filter =>
        val node = new FilterNode()
        node.setParentNode(root)
        node.setCondition(plan.condition.toString)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)

      case plan: Join =>
        val node = new JoinNode()
        node.setName(plan.joinType.toString + " " + node.getName)
        node.setParentNode(root)
        node.setCondition(plan.condition.toString)
        root.getChildren.add(node)
        resolveLogicPlan(plan.left, node)
        resolveLogicPlan(plan.right, node)
      case plan: Window =>
        val node = new WindowNode()
        insertNodeColumnsFromNamedExpression(node, plan.windowExpressions)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)
      case plan: Union =>
        val node = new UnionNode()
        node.setParentNode(root)
        root.getChildren.add(node)
        plan.children.foreach(resolveLogicPlan(_, node))

      case plan: SubqueryAlias =>
        val node = new SubqueryNode()
        node.setName(node.getName + " " + plan.name.toString())
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)
      case plan: Generate =>
        val node = new GenerateNode()
        processGenerate(plan, node)
        node.setParentNode(root)
        root.getChildren.add(node)
        resolveLogicPlan(plan.child, node)

      case _ =>
        plan.children.foreach(resolveLogicPlan(_, root))
    }

  }
  1. 到这里,我们已经得到了自己的树形结构。接下来要通过唯一id进行关联,补充库表信息。
    不知道大家注意没有,我们在node对象中有一个方法,

    spark-sql字段血缘实现,spark,spark,sql,大数据
    这里用到了访问者设计模式,感兴趣的同学可以学习一下,在spark-sql源码中,同样用的是访问者设计模式。

这里主要说一下Visitor的定义及方法:processColumn方法主要是拿自己的ExprId和所有孩子结点的ExprId比较,如果相等的话,说明是同一个字段,那就表名复制过来。

public interface Visitor {

    void visit(Node node);

    default void processColumn(Node node) {
        for (Column column1 : node.getColumnList()) {
            for (Node nd : node.getChildren()) {
                for (Column column2 : nd.getColumnList()) {
                    processColumn(column1, column2);
                }
            }
        }

    }

     default void processColumn(Column column1, Column column2) {
        List<Column> child = column1.getChild();
        child.forEach(ch -> processColumn(ch, column2));
        if (column1.getExprId().equals(column2.getExprId())) {
            if(column2.getTableName() != null) {
                column1.setTableName(column2.getTableName());
            } else {
                column1.getChild().addAll(column2.getChild());
            }
        }
    }
}

LineageVisitor是Visitor的实现类, 主要用来做模式匹配,不同的结点处理方式会有不同,感兴趣的同学看一下这块的代码。

public class LineageVisitor implements Visitor{
    @Override
    public void visit(Node node) {
        switch (node.getClass().getSimpleName()) {
            case "FilterNode" :
            case "SubqueryNode" :
            case "JoinNode" : copyChildColumnToThis(node); break;
            case "WindowNode" :
            case "GenerateNode" : copyChildColumnToThisWithProcess(node); break;
            case "Root" : processColumn((Root)node); break;
            case "UnionNode" : processColumn((UnionNode)node); break;
            default : processColumn(node);
        }
    }

    @Override
    public void processColumn(Node node) {
        node.getChildren().forEach( child -> child.accept(this));
        Visitor.super.processColumn(node);
    }

    public void copyChildColumnToThis(Node node) {
        node.getChildren().forEach( child -> child.accept(this));
        for (Node child : node.getChildren()) {
            node.getColumnList().addAll(child.getColumnList());
        }
    }
    public void copyChildColumnToThisWithProcess(Node node) {
        node.getChildren().forEach( child -> child.accept(this));
        Visitor.super.processColumn(node);
        for (Node child : node.getChildren()) {
            node.getColumnList().addAll(child.getColumnList());
        }

    }

    public void processColumn(Root node) {
        node.getChildren().forEach( child -> child.accept(this));
        if(node.getColumnList().size() > 0) {
            for (int i = 0; i < node.getChildren().get(0).getColumnList().size(); i++) {
                for (Node child : node.getChildren()) {
                    node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));
                }
            }
        }
    }

    public void processColumn(UnionNode node) {
        node.getChildren().forEach( child -> child.accept(this));
        int size = node.getChildren().get(0).getColumnList().size();
        for (Column column : node.getChildren().get(0).getColumnList()) {
            Column column1 = new Column();
            column1.setName(column.getName());
            column1.setExprId(column.getExprId());
            node.getColumnList().add(column1);
        }
        for (int i = 0; i < size; i++) {
            for (Node child : node.getChildren()) {
                node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));
            }
        }
    }
}

成果展示

还是拿最开始的sql ,看一下最终生成的字段血缘

insert into default.jy_test select * from default.jy_test

spark-sql字段血缘实现,spark,spark,sql,大数据

最后

字段血缘实现起来还是比较困难的,需要了解spak-sql的底层原理和一些技巧。
这里方便大家使用、学习、交流,所以贡献自己的源码,仓库地址:https://gitee.com/chenxiaoliang0901/crock/tree/main文章来源地址https://www.toymoban.com/news/detail-786451.html

到了这里,关于spark-sql字段血缘实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark-SQL小结

    目录 一、RDD、DataFrame、DataSet的概念、区别联系、相互转换操作   1.RDD概念   2.DataFrame概念   3.DataSet概念   4.RDD、DataFrame、DataSet的区别联系   5.RDD、DataFrame、DataSet的相互转换操作    1 RDD-DataFrame、DataSet    2  DataFrame-RDD,DataSet    3 DataSet-RDD,DataFrame 二、Spark-SQL连接JDBC的方式

    2024年02月09日
    浏览(44)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(46)
  • Spark参数配置和调优,Spark-SQL、Config

    一、Hive-SQL / Spark-SQL参数配置和调优 二、shell脚本spark-submit参数配置 三、sparkSession中配置参数

    2024年02月13日
    浏览(47)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(43)
  • Spark-SQL连接Hive的五种方法

    若使用Spark内嵌的Hive,直接使用即可,什么都不需要做(在实际生产活动中,很少会使用这一模式) 步骤: 将Hive中conf/下的hive-site.xml拷贝到Spark的conf/目录下; 把Mysql的驱动copy到jars/目录下; 如果访问不到hdfs,则将core-site.xml和hdfs-site.xml拷贝到conf/目录下; 重启spark-shell;

    2024年02月16日
    浏览(43)
  • Spark-SQL连接JDBC的方式及代码写法

    提示:文章内容仅供参考! 目录 一、数据加载与保存 通用方式: 加载数据: 保存数据: 二、Parquet 加载数据: 保存数据: 三、JSON 四、CSV  五、MySQL SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的

    2024年02月13日
    浏览(32)
  • spark-sql: insert overwrite分区表问题

    用spark-sql,insert overwrite分区表时发现两个比较麻烦的问题: 从目标表select出来再insert overwrite目标表时报错:Error in query: Cannot overwrite a path that is also being read from. 从其他表select出来再insert overwrite目标表时,其他分区都被删除了. 印象中这两个问题也出现过,但凭经验和感觉,

    2024年02月11日
    浏览(48)
  • spark-sql处理json字符串的常用函数

    整理了spark-sql处理json字符串的几个函数: 1 get_json_object 解析不含数组的 json   2 from_json  解析json 3 schema_of_json 提供生成json格式的方法 4 explode   把JSONArray转为多行 get_json_object(string json_string, string path) :适合最外层为{}的json解析。  第一个参数是json对象变量,也就是含j

    2023年04月08日
    浏览(43)
  • 在 spark-sql / spark-shell / hive / beeline 中粘贴 sql、程序脚本时的常见错误

    《大数据平台架构与原型实现:数据中台建设实战》一书由博主历时三年精心创作,现已通过知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描

    2024年02月14日
    浏览(36)
  • spark-sql(jdbc)本地模式导出csv或Excel文件

    注意: 当前excel和commons-io版本都是较较新版本,而commons-io在spark的jars安装目录下也在commons-io的包,如版本冲突,找不到 orgapachecommonsiooutputByteArrayOutputStream.class 。如果spark的是2.4或者更低版本,则找不到 orgapachecommonsiooutputUnsynchronizedByteArrayOutputStream.class ,请同步spa

    2024年02月02日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包