【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

这篇具有很好参考价值的文章主要介绍了【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Scala二次开发Spark实现对MySQL的upsert操作

背景

在我们的数仓升级项目中,遇到了这样的场景:古人开发的任务是使用DataStage运算后,按照主键【或者多个字段拼接的唯一键】来做insert then update,顾名思义,也就是无则插入,有则后一条数据会覆盖前一条。这其实类似于MySQL的upsert,当然Oracle也有类似的Merge操作,这部分主要是数据库开发攻城狮关注的。

古人这么做,目的不外乎2个:重跑后数据不变【幂等性】、不会有主键冲突问题【根据主键或者拼接的唯一键去重】

之前有写过一篇拉链表翻写HQL任务的案例:https://lizhiyong.blog.csdn.net/article/details/129679071

这种场景纯HQL可以写出来,也就是最终的数据=保留不变的历史数据+之前没有记录所以要插入的数据+之前有记录所以直接更新的数据,比拉链表还少了将之前有记录的历史数据置为无效的那部分数据,所以我们习惯称之为“假拉链”。

当字段个数多、上游来源表多、上游运算复杂的情况,HQL写出来其实并不短,这么一大坨做完union all再按照2PC的方式先落盘到tmp的中间表,再回灌到结果表,然后跑数据集成任务再推送给Oracle。。。这一系列骚操作,智商要求不高但是工作量可一点都不少。纯HQL的方式能跑,但2023年了还用这种方式模拟DataStage2015年的功能,不算明智。

离线跑批首选的Spark原生却不支持这么玩,但是想想办法总还是可以实现这种功能。。。

原理分析

Spark的save

参照这一篇:https://lizhiyong.blog.csdn.net/article/details/128090026

可以抽象出最简易的Spark写数据操作:

package com.zhiyong.day20230425upDate


import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.util

object UpDateDemo1 {
  def main(args: Array[String]): Unit = {
    val sc: SparkSession = SparkSession
      .builder
      .appName("sparkUpdateZhiyong")
      .master("local[8]").
      enableHiveSupport
      .getOrCreate

    import org.apache.spark.sql._
    import sc.implicits._

    var df1: DataFrame = sc.range(1000).toDF("id")
    df1.show()

    df1 = df1.withColumn("comment",functions.lit("CSDN@虎鲸不是鱼"))

    df1 = df1.withColumn("comment1",functions.concat(df1.col("id"),df1.col("comment")))
      .drop("comment")

    df1.write
      .format("jdbc")
      .mode(SaveMode.Append)
      .options(
        Map(
          "url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
          "dbtable" -> "test_origin_20001128",
          "user" -> "root",
          "password" -> "123456",
          "driver" -> "com.mysql.cj.jdbc.Driver"
        )
      )
      .save()

  }

}

从这个最常见的save算子入手。点进去可以发现:

package org.apache.spark.sql

/**
 * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use `Dataset.write` to access this.
 *
 * @since 1.4.0
 */
@Stable
final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
  /**
   * Saves the content of the `DataFrame` as the specified table.
   *
   * @since 1.4.0
   */
  def save(): Unit = saveInternal(None)
    
  private val df = ds.toDF()
}

还是这个类里:

  /**
   * Specifies the behavior when data or table already exists. Options include:
   * <ul>
   * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
   * <li>`SaveMode.Append`: append the data.</li>
   * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
   * </ul>
   * <p>
   * The default option is `ErrorIfExists`.
   *
   * @since 1.4.0
   */
  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
    this.mode = saveMode
    this
  }

private def saveInternal(path: Option[String]): Unit = {
  if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
    throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("write")
  }

  assertNotBucketed("save")

  val maybeV2Provider = lookupV2Provider()
  if (maybeV2Provider.isDefined) {
    val provider = maybeV2Provider.get
    val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
      provider, df.sparkSession.sessionState.conf)

    val optionsWithPath = getOptionsWithPath(path)

    val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
      optionsWithPath.originalMap
    val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)

    def getTable: Table = {
      // If the source accepts external table metadata, here we pass the schema of input query
      // and the user-specified partitioning to `getTable`. This is for avoiding
      // schema/partitioning inference, which can be very expensive.
      // If the query schema is not compatible with the existing data, the behavior is undefined.
      // For example, writing file source will success but the following reads will fail.
      if (provider.supportsExternalMetadata()) {
        provider.getTable(
          df.schema.asNullable,
          partitioningAsV2.toArray,
          dsOptions.asCaseSensitiveMap())
      } else {
        DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)
      }
    }

    import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
    val catalogManager = df.sparkSession.sessionState.catalogManager
    mode match {
      case SaveMode.Append | SaveMode.Overwrite =>
        val (table, catalog, ident) = provider match {
          case supportsExtract: SupportsCatalogOptions =>
            val ident = supportsExtract.extractIdentifier(dsOptions)
            val catalog = CatalogV2Util.getTableProviderCatalog(
              supportsExtract, catalogManager, dsOptions)

            (catalog.loadTable(ident), Some(catalog), Some(ident))
          case _: TableProvider =>
            val t = getTable
            if (t.supports(BATCH_WRITE)) {
              (t, None, None)
            } else {
              // Streaming also uses the data source V2 API. So it may be that the data source
              // implements v2, but has no v2 implementation for batch writes. In that case, we
              // fall back to saving as though it's a V1 source.
              return saveToV1Source(path)
            }
        }

        val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
        checkPartitioningMatchesV2Table(table)
        if (mode == SaveMode.Append) {
          runCommand(df.sparkSession) {
            AppendData.byName(relation, df.logicalPlan, finalOptions)
          }
        } else {
          // Truncate the table. TableCapabilityCheck will throw a nice exception if this
          // isn't supported
          runCommand(df.sparkSession) {
            OverwriteByExpression.byName(
              relation, df.logicalPlan, Literal(true), finalOptions)
          }
        }

      case createMode =>
        provider match {
          case supportsExtract: SupportsCatalogOptions =>
            val ident = supportsExtract.extractIdentifier(dsOptions)
            val catalog = CatalogV2Util.getTableProviderCatalog(
              supportsExtract, catalogManager, dsOptions)

            val tableSpec = TableSpec(
              properties = Map.empty,
              provider = Some(source),
              options = Map.empty,
              location = extraOptions.get("path"),
              comment = extraOptions.get(TableCatalog.PROP_COMMENT),
              serde = None,
              external = false)
            runCommand(df.sparkSession) {
              CreateTableAsSelect(
                UnresolvedDBObjectName(
                  catalog.name +: ident.namespace.toSeq :+ ident.name,
                  isNamespace = false
                ),
                partitioningAsV2,
                df.queryExecution.analyzed,
                tableSpec,
                finalOptions,
                ignoreIfExists = createMode == SaveMode.Ignore)
            }
          case _: TableProvider =>
            if (getTable.supports(BATCH_WRITE)) {
              throw QueryCompilationErrors.writeWithSaveModeUnsupportedBySourceError(
                source, createMode.name())
            } else {
              // Streaming also uses the data source V2 API. So it may be that the data source
              // implements v2, but has no v2 implementation for batch writes. In that case, we
              // fallback to saving as though it's a V1 source.
              saveToV1Source(path)
            }
        }
    }

  } else {
    saveToV1Source(path)
  }
}

这就是Spark的save算子触发的一系列骚操作。。。

由于SaveMode原生只有这么4种:

package org.apache.spark.sql;

import org.apache.spark.annotation.Stable;

/**
 * SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
 *
 * @since 1.3.0
 */
@Stable
public enum SaveMode {
  /**
   * Append mode means that when saving a DataFrame to a data source, if data/table already exists,
   * contents of the DataFrame are expected to be appended to existing data.
   *
   * @since 1.3.0
   */
  Append,
  /**
   * Overwrite mode means that when saving a DataFrame to a data source,
   * if data/table already exists, existing data is expected to be overwritten by the contents of
   * the DataFrame.
   *
   * @since 1.3.0
   */
  Overwrite,
  /**
   * ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
   * an exception is expected to be thrown.
   *
   * @since 1.3.0
   */
  ErrorIfExists,
  /**
   * Ignore mode means that when saving a DataFrame to a data source, if data already exists,
   * the save operation is expected to not save the contents of the DataFrame and to not
   * change the existing data.
   *
   * @since 1.3.0
   */
  Ignore
}

链式编程的.mode(SaveMode.Append)这一步已经修改了保存的模式【正常人都是使用AppendOverwrite】,所以走读源码应该注重的就是append或者overwrite的内容。根据注释可知如果结果表不支持truncate这种骚操作,就会爆一个灰常nice的异常,类似这样:https://lizhiyong.blog.csdn.net/article/details/124575115

但是不要忽视了还有saveToV1Source(path)这个分支。。。

分支

val maybeV2Provider = lookupV2Provider()进去:

private def lookupV2Provider(): Option[TableProvider] = {
  DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
    // TODO(SPARK-28396): File source v2 write path is currently broken.
    case Some(_: FileDataSourceV2) => None
    case other => other
  }
}

单纯从返回值的TableProvider泛型就可以看出点眉目。

package org.apache.spark.sql.connector.catalog;
/**
 * The base interface for v2 data sources which don't have a real catalog. Implementations must
 * have a public, 0-arg constructor.
 * <p>
 * Note that, TableProvider can only apply data operations to existing tables, like read, append,
 * delete, and overwrite. It does not support the operations that require metadata changes, like
 * create/drop tables.
 * <p>
 * The major responsibility of this interface is to return a {@link Table} for read/write.
 * </p>
 *
 * @since 3.0.0
 */
@Evolving
public interface TableProvider {
}

继承关系:

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

显然其中并没有JDBC。那么执行maybeV2Provider.isDefined

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

反编译后:

public boolean isDefined() {
   return !this.isEmpty();
}

当然不可能有内容。所以JDBC方式会get带empty,当然就是走老的saveToV1Source(path)分支。

不服可以打断点debug一下。。。

打断点找到调用的堆栈

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

所以下一步会到达这里:

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

也就是:

private def saveToV1Source(path: Option[String]): Unit = {
  partitioningColumns.foreach { columns =>
    extraOptions = extraOptions + (
      DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
      DataSourceUtils.encodePartitioningColumns(columns))
  }

  val optionsWithPath = getOptionsWithPath(path)

  // Code path for data source v1.
  runCommand(df.sparkSession) {
    DataSource(
      sparkSession = df.sparkSession,
      className = source,
      partitionColumns = partitioningColumns.getOrElse(Nil),
      options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan)
  }
}

这年头当然不会没事找事给MySQL或者Oracle表搞partition,所以需要关注的就是执行了命令的这个方法。有用的就是options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan)

package org.apache.spark.sql.execution.datasources

case class DataSource(

/**
 * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
 */
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
  providingInstance() match {
    case dataSource: CreatableRelationProvider =>
      disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = true)
      SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
    case format: FileFormat =>
      disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false)
      DataSource.validateSchema(data.schema)
      planForWritingFileFormat(format, mode, data)
    case _ => throw new IllegalStateException(
      s"${providingClass.getCanonicalName} does not allow create table as select.")
  }
}
    
)

这玩意儿就是生成把给定的逻辑计划写入数据源的逻辑计划。。。好像很拗口。。。

既然返回值是逻辑计划类,显然下一步调用的就是:

/**
 * Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the
 * user-registered callback functions.
 */
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
  val qe = session.sessionState.executePlan(command)
  qe.assertCommandExecuted()
}

继续跳转:

def assertCommandExecuted(): Unit = commandExecuted

继续跳转:

lazy val commandExecuted: LogicalPlan = mode match {
  case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
  case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
  case CommandExecutionMode.SKIP => analyzed
}

后续的跳转就是具体执行逻辑计划【可能catalyst要做语义分析之类的操作】,没啥必要看下去。好奇的话可以打个断点debug:

Exception in thread "main" com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
	at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:829)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:122)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:118)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at com.zhiyong.day20230425upDate.UpDateDemo1$.main(UpDateDemo1.scala:41)
	at com.zhiyong.day20230425upDate.UpDateDemo1.main(UpDateDemo1.scala)
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

报错的堆栈就是调用过程。

从planForWriting方法找数据源

这个方法会返回实际执行的逻辑计划,所以一定要仔细研究。

走JDBC协议一定是操作表而非直接操作文件,所以重点还是CreatableRelationProvider

/**
 * @since 1.3.0
 */
@Stable
trait CreatableRelationProvider {
  /**
   * Saves a DataFrame to a destination (using data source-specific parameters)
   *
   * @param sqlContext SQLContext
   * @param mode specifies what happens when the destination already exists
   * @param parameters data source-specific parameters
   * @param data DataFrame to save (i.e. the rows after executing the query)
   * @return Relation with a known schema
   *
   * @since 1.3.0
   */
  def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      data: DataFrame): BaseRelation
}

其实Scala的trait就相当于Java的接口。。。所以要找继承关系:

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

毫无疑问3选1,就是JdbcRelationProvider

package org.apache.spark.sql.execution.datasources.jdbc

import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}

class JdbcRelationProvider extends CreatableRelationProvider
  with RelationProvider with DataSourceRegister {

  override def shortName(): String = "jdbc"

  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val jdbcOptions = new JDBCOptions(parameters)
    val resolver = sqlContext.conf.resolver
    val timeZoneId = sqlContext.conf.sessionLocalTimeZone
    val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
    val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
    JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
  }

  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      df: DataFrame): BaseRelation = {
    val options = new JdbcOptionsInWrite(parameters)
    val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
    val dialect = JdbcDialects.get(options.url)
    val conn = dialect.createConnectionFactory(options)(-1)
    try {
      val tableExists = JdbcUtils.tableExists(conn, options)
      if (tableExists) {
        mode match {
          case SaveMode.Overwrite =>
            if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
              // In this case, we should truncate table and then load.
              truncateTable(conn, options)
              val tableSchema = JdbcUtils.getSchemaOption(conn, options)
              saveTable(df, tableSchema, isCaseSensitive, options)
            } else {
              // Otherwise, do not truncate the table, instead drop and recreate it
              dropTable(conn, options.table, options)
              createTable(conn, options.table, df.schema, isCaseSensitive, options)
              saveTable(df, Some(df.schema), isCaseSensitive, options)
            }

          case SaveMode.Append =>
            val tableSchema = JdbcUtils.getSchemaOption(conn, options)
            saveTable(df, tableSchema, isCaseSensitive, options)

          case SaveMode.ErrorIfExists =>
            throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)

          case SaveMode.Ignore =>
            // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
            // to not save the contents of the DataFrame and to not change the existing data.
            // Therefore, it is okay to do nothing here and then just return the relation below.
        }
      } else {
        createTable(conn, options.table, df.schema, isCaseSensitive, options)
        saveTable(df, Some(df.schema), isCaseSensitive, options)
      }
    } finally {
      conn.close()
    }

    createRelation(sqlContext, parameters)
  }
}

从shortName=“jdbc”,和写入数据时指定的格式一致:

df1.write
  .format("jdbc")

显然JdbcRelationProvider就是要找的数据源。createRelation就是实际写入数据的方法。

方案

直接去修改源码createRelation方法当然是可以的,但是这么玩还得重新编译,如果单位租了1w美刀/年/机的CDP,这么玩没有人提供售后服务。。。所以最合适的方案当然还是类似笔者之前的做法:https://lizhiyong.blog.csdn.net/article/details/124575115

能够在不动源码、不重新编译和部署的前提下做二次开发满足需求当然再好不过了。编译和部署搞不好把机器折腾down了又是5w字检讨。。。

那么接下来要做的事情就是重写一个数据源,再想办法让Spark的Catalyst生成逻辑计划时使用该数据源。

具体实现

重写数据源

先找猫画虎写个ZhiyongMysqlUpsertRelationProvider

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

报错:Symbol conf is inaccessible from this place

还只能和Spark该对象的包名一致。。。坑,是真的坑。。。

平台开发人员拥有root或者hdfs.keytab是再正常不过的事情了,但是SQL Boy们操作Linux服务器的水平不敢恭维。。。安全起见,只保留追加模式,且有表的前提下才能upsert。其余情况一律抛异常:

package org.apache.spark.sql

import org.apache.spark.sql.ZhiyongMysqlUpsertJdbcUtils.upsertTable
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createTable, dropTable, isCascadingTruncateTable, saveTable, truncateTable}
import org.apache.spark.sql.execution.datasources.jdbc.{JdbcOptionsInWrite, JdbcRelationProvider, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources.BaseRelation

class ZhiyongMysqlUpsertRelationProvider extends JdbcRelationProvider {
  override def createRelation(sqlContext: SQLContext,
                              mode: SaveMode,
                              parameters: Map[String, String],
                              df: DataFrame): BaseRelation = {
    val options = new JdbcOptionsInWrite(parameters)
    val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
    val dialect = JdbcDialects.get(options.url)
    val conn = dialect.createConnectionFactory(options)(-1)


    try {
      val tableExists = JdbcUtils.tableExists(conn, options)
      if (tableExists) {
        mode match {
          case SaveMode.Overwrite =>
            if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
              // In this case, we should truncate table and then load.
              throw new RuntimeException("【CSDN@虎鲸不是鱼】:只允许使用Append模式")
              //              truncateTable(conn, options)
              //              val tableSchema = JdbcUtils.getSchemaOption(conn, options)
              //              saveTable(df, tableSchema, isCaseSensitive, options)
            } else {
              // Otherwise, do not truncate the table, instead drop and recreate it
              throw new RuntimeException("【CSDN@虎鲸不是鱼】:只允许使用Append模式")
              //              dropTable(conn, options.table, options)
              //              createTable(conn, options.table, df.schema, isCaseSensitive, options)
              //              saveTable(df, Some(df.schema), isCaseSensitive, options)
            }

          case SaveMode.Append =>
            val tableSchema = JdbcUtils.getSchemaOption(conn, options)
            //            saveTable(df, tableSchema, isCaseSensitive, options)
            upsertTable(df, tableSchema, isCaseSensitive, options)

          // TODO: 需要写upsert的具体方法


          case SaveMode.ErrorIfExists =>
            throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)

          case SaveMode.Ignore =>
            println("【CSDN@虎鲸不是鱼】:Ignore模式无操作")
          // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
          // to not save the contents of the DataFrame and to not change the existing data.
          // Therefore, it is okay to do nothing here and then just return the relation below.
        }
      } else {
        throw new RuntimeException("【CSDN@虎鲸不是鱼】:结果表不存在,请租户自行确认表是否正确或手动建表")
        //        createTable(conn, options.table, df.schema, isCaseSensitive, options)
        //        saveTable(df, Some(df.schema), isCaseSensitive, options)
      }
    } finally {
      conn.close()
    }

    createRelation(sqlContext, parameters)
  }

}

先这么写。

重写JDBC工具类

由于saveTable这种方法是调用了工具类:

package org.apache.spark.sql.execution.datasources.jdbc

/**
 * Util functions for JDBC tables.
 */
object JdbcUtils extends Logging with SQLConfHelper {
  /**
   * Saves the RDD to the database in a single transaction.
   */
  def saveTable(
      df: DataFrame,
      tableSchema: Option[StructType],
      isCaseSensitive: Boolean,
      options: JdbcOptionsInWrite): Unit = {
    val url = options.url
    val table = options.table
    val dialect = JdbcDialects.get(url)
    val rddSchema = df.schema
    val batchSize = options.batchSize
    val isolationLevel = options.isolationLevel

    val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
    val repartitionedDF = options.numPartitions match {
      case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
        n, JDBCOptions.JDBC_NUM_PARTITIONS)
      case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
      case _ => df
    }
    repartitionedDF.rdd.foreachPartition { iterator => savePartition(
      table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options)
    }
      
  /**
   * Returns an Insert SQL statement for inserting a row into the target table via JDBC conn.
   */
  def getInsertStatement(
      table: String,
      rddSchema: StructType,
      tableSchema: Option[StructType],
      isCaseSensitive: Boolean,
      dialect: JdbcDialect): String = {
    val columns = if (tableSchema.isEmpty) {
      rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
    } else {
      // The generated insert statement needs to follow rddSchema's column sequence and
      // tableSchema's column names. When appending data into some case-sensitive DBMSs like
      // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
      // RDD column names for user convenience.
      val tableColumnNames = tableSchema.get.fieldNames
      rddSchema.fields.map { col =>
        val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
          throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
        }
        dialect.quoteIdentifier(normalizedName)
      }.mkString(",")
    }
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    s"INSERT INTO $table ($columns) VALUES ($placeholders)"
  }
  }
}

该工具类内部还调用了别的方法。

所以还需要先临摹一个类似的工具类以及一个类似的获取状态的方法。

通过options对象的JdbcOptionsInWrite可以看到:

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, DriverManager}
import java.util.{Locale, Properties}

import org.apache.commons.io.FilenameUtils

import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors

/**
 * Options for the JDBC data source.
 */
class JDBCOptions(
    val parameters: CaseInsensitiveMap[String])
  extends Serializable with Logging {

  import JDBCOptions._

  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

  def this(url: String, table: String, parameters: Map[String, String]) = {
    this(CaseInsensitiveMap(parameters ++ Map(
      JDBCOptions.JDBC_URL -> url,
      JDBCOptions.JDBC_TABLE_NAME -> table)))
  }

//此处省略一大坨内容

class JdbcOptionsInWrite(
    override val parameters: CaseInsensitiveMap[String])
  extends JDBCOptions(parameters) {

  import JDBCOptions._

  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

  def this(url: String, table: String, parameters: Map[String, String]) = {
    this(CaseInsensitiveMap(parameters ++ Map(
      JDBCOptions.JDBC_URL -> url,
      JDBCOptions.JDBC_TABLE_NAME -> table)))
  }

  require(
    parameters.get(JDBC_TABLE_NAME).isDefined,
    s"Option '$JDBC_TABLE_NAME' is required. " +
      s"Option '$JDBC_QUERY_STRING' is not applicable while writing.")

  val table = parameters(JDBC_TABLE_NAME)
}

object JDBCOptions {
  private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
  private val jdbcOptionNames = collection.mutable.Set[String]()

  private def newOption(name: String): String = {
    jdbcOptionNames += name.toLowerCase(Locale.ROOT)
    name
  }

  val JDBC_URL = newOption("url")
  val JDBC_TABLE_NAME = newOption("dbtable")
  val JDBC_QUERY_STRING = newOption("query")
  val JDBC_DRIVER_CLASS = newOption("driver")
  val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
  val JDBC_LOWER_BOUND = newOption("lowerBound")
  val JDBC_UPPER_BOUND = newOption("upperBound")
  val JDBC_NUM_PARTITIONS = newOption("numPartitions")
  val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
  val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
  val JDBC_TRUNCATE = newOption("truncate")
  val JDBC_CASCADE_TRUNCATE = newOption("cascadeTruncate")
  val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
  val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
  val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
  val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
  val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
  val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
  val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
  val JDBC_PUSHDOWN_AGGREGATE = newOption("pushDownAggregate")
  val JDBC_PUSHDOWN_LIMIT = newOption("pushDownLimit")
  val JDBC_PUSHDOWN_TABLESAMPLE = newOption("pushDownTableSample")
  val JDBC_KEYTAB = newOption("keytab")
  val JDBC_PRINCIPAL = newOption("principal")
  val JDBC_TABLE_COMMENT = newOption("tableComment")
  val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
  val JDBC_CONNECTION_PROVIDER = newOption("connectionProvider")
}

可以发现这里有很多平时写jdbc时的option。

工具类暂时先这么写出来:

package org.apache.spark.sql

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{getInsertStatement, savePartition}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.StructType

object ZhiyongMysqlUpsertJdbcUtils {

  //需要重写方法
  def getUpsertStatement(table: String,
                         rddSchema: StructType,
                         tableSchema: Option[StructType],
                         isCaseSensitive: Boolean,
                         dialect: JdbcDialect): String = {

    println("【CSDN@虎鲸不是鱼】:开始获取UpSert状态")

    s"""
       |
       |""".stripMargin

  }

  def upsertTable(
                   df: DataFrame,
                   tableSchema: Option[StructType],
                   isCaseSensitive: Boolean,
                   options: JdbcOptionsInWrite): Unit = {
    val url = options.url
    val table = options.table
    val dialect = JdbcDialects.get(url)
    val rddSchema: StructType = df.schema
    val batchSize: Int = options.batchSize
    val isolationLevel = options.isolationLevel

    println("dialect=" + dialect)
    println("rddSchema=" + rddSchema)

    //    val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
    // 这里需要改

    val upsertStmt = getUpsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)

    val repartitionedDF = options.numPartitions match {
      case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
        n, JDBCOptions.JDBC_NUM_PARTITIONS)
      case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
      case _ => df
    }
    repartitionedDF.rdd.foreachPartition { iterator =>
      savePartition(
        table, iterator, rddSchema, upsertStmt, batchSize, dialect, isolationLevel, options)
    }
  }

}

获取upsert状态的方法稍后写。

重写隐式类

参照:https://docs.scala-lang.org/zh-cn/overviews/core/implicit-classes.html

Scala 2.10引入了一种叫做隐式类的新特性。隐式类指的是用implicit关键字修饰的类。在对应的作用域内,带有这个关键字的类的主构造函数可用于隐式转换。

隐式类型是在SIP-13中提出的。参照着可以这么写:

package org.apache.spark.sql

object ZhiyongMysqlDataFrameWriter {

  implicit class ZhiyongMysqlUpsertDataFrameWriter(writer: DataFrameWriter[Row]){
    def upsert():Unit={

      println("反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法")

    }
  }

}

将启动类写数据部分简单修改和运行:

    df1.write
      .format("jdbc")
      .mode(SaveMode.Append)
      .options(
        Map(
          "url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
          "dbtable" -> "test_origin_20001128",
          "user" -> "root",
          "password" -> "123456",
          "driver" -> "com.mysql.cj.jdbc.Driver"
        )
      )
      //.save()
      .upsert //Scala可以反射

根据报错的log:

反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
Syntax error, unexpected empty statement(line 1, pos 0)

== SQL ==

^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:304)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
	at com.zhiyong.day20230425upDate.UpDateDemo1$.main(UpDateDemo1.scala:48)
	at com.zhiyong.day20230425upDate.UpDateDemo1.main(UpDateDemo1.scala)

说明可以反射调用成功。但是Java就没办法这样子使用了,只能改源码和重新编译、部署。。。同样是JVM语言,不得不佩服Martin Odersky的Scala,太强大了。。。但是Scala的学习难度也不小,野生程序猿如果胆敢不去系统学习,就会变成笔者这样的学徒工。。。

接下来就是补充upsert方法以及上一步的getUpsertStatement方法。

补充getUpsertStatement方法

参照:

/**
 * Returns an Insert SQL statement for inserting a row into the target table via JDBC conn.
 */
def getInsertStatement(
    table: String,
    rddSchema: StructType,
    tableSchema: Option[StructType],
    isCaseSensitive: Boolean,
    dialect: JdbcDialect): String = {
  val columns = if (tableSchema.isEmpty) {
    rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
  } else {
    // The generated insert statement needs to follow rddSchema's column sequence and
    // tableSchema's column names. When appending data into some case-sensitive DBMSs like
    // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
    // RDD column names for user convenience.
    val tableColumnNames = tableSchema.get.fieldNames
    rddSchema.fields.map { col =>
      val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
        throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
      }
      dialect.quoteIdentifier(normalizedName)
    }.mkString(",")
  }
  val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
  s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}

显然这玩意儿就是拼接了一个类似:

insert into db_name.tb_name (col1,col2,col3) values ("value1","value2","value3")

的玩意儿。。。之后走遍历分区、按1000条一个批次去灌数据。

所以接下来要做的事情就是拼接出一个类似:

insert into db_name.tb_name (pk1,col1,col2,col3) values ("pk1_value","value1","value2","value3") on duplicate key update col1="value1",col2=value2,col3=value3

的玩意儿:

def getUpsertStatement(table: String,
                       rddSchema: StructType,
                       tableSchema: Option[StructType],
                       isCaseSensitive: Boolean,
                       dialect: JdbcDialect): String = {

  println("【CSDN@虎鲸不是鱼】:开始获取UpSert状态")

  val columns = if (tableSchema.isEmpty) {
    rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
  } else {
    // The generated insert statement needs to follow rddSchema's column sequence and
    // tableSchema's column names. When appending data into some case-sensitive DBMSs like
    // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
    // RDD column names for user convenience.
    val tableColumnNames = tableSchema.get.fieldNames
    rddSchema.fields.map { col =>
      val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
        throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
      }
      dialect.quoteIdentifier(normalizedName)
    }.mkString(",")
  }
  val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
  s"""
     |INSERT INTO $table ($columns) VALUES ($placeholders)
     |ON DUPLICATE KEY UPDATE
     |${columns.split(",").map(col=>s"$col=VALUES($col)").mkString(",")}
     |""".stripMargin
  

}

使用Scala还是要比Java简洁不少。

补充upsert方法

由于

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

这些玩意儿都是private的,不想修改源码重新编译和部署就得用反射的方式操作它们:

    def upsert():Unit={

      println("反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法")


      writer


      val extraOptionsField: Field = writer.getClass.getDeclaredField("extraOptions")
      val dfField = writer.getClass.getDeclaredField("df")
      val sourceField = writer.getClass.getDeclaredField("source")
      val partitioningColumnsField = writer.getClass.getDeclaredField("partitioningColumns")
      extraOptionsField.setAccessible(true) //关闭安全检查就可以提升反射速度
      dfField.setAccessible(true) //关闭安全检查就可以提升反射速度
      sourceField.setAccessible(true) //关闭安全检查就可以提升反射速度
      partitioningColumnsField.setAccessible(true)  //关闭安全检查就可以提升反射速度
      val extraOptions = extraOptionsField.get(writer).asInstanceOf[CaseInsensitiveMap[String]]
      val df: DataFrame = dfField.get(writer).asInstanceOf[DataFrame]
      val partitioningColumns = partitioningColumnsField.get(writer).asInstanceOf[Option[Seq[String]]]
      val logicalPlanField = df.getClass.getDeclaredField("logicalPlan")
      logicalPlanField.setAccessible(true)  //关闭安全检查就可以提升反射速度
      var logicalPlan = logicalPlanField.get(df).asInstanceOf[LogicalPlan]
      val session = df.sparkSession

      logicalPlan =
        DataSource(
        sparkSession = session,
        className = "org.apache.spark.sql.ZhiyongMysqlUpsertRelationProvider",
        partitionColumns = partitioningColumns.getOrElse(Nil),
        options = extraOptions.toMap).planForWriting(SaveMode.Append, logicalPlan)
      val qe: QueryExecution = session.sessionState.executePlan(logicalPlan)
      SQLExecution.withNewExecutionId(qe)(qe.toRdd)

    }

此时基本完工。

验证

准备MySQL表和数据

[root@zhiyong1 ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.30 MySQL Community Server (GPL)
mysql> use db_lzy;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed

然后建表:

create table if not exists test_upsert_20230429_res(
    id int comment '主键',
    col1 varchar(2000) comment '字段1',
    col2 varchar(2000) comment '字段2',
    col3 varchar(2000) comment '字段3',
    primary key (id)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
;

create table if not exists test_upsert_20230429_src(
    id int comment '主键',
    col1 varchar(2000) comment '字段1',
    col2 varchar(2000) comment '字段2',
    col3 varchar(2000) comment '字段3',
    primary key (id)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
;

然后准备数据:

insert into test_upsert_20230429_src values
(1,'a1','b1','c1'),
(2,'a2','b2','c2'),
(3,'a3','b3','c3'),
(5,'a55','b555','c5555'),
(6,'a66','b666','c6666')
;

查数据:

mysql> select * from test_upsert_20230429_src;
+----+------+------+-------+
| id | col1 | col2 | col3  |
+----+------+------+-------+
|  1 | a1   | b1   | c1    |
|  2 | a2   | b2   | c2    |
|  3 | a3   | b3   | c3    |
|  5 | a55  | b555 | c5555 |
|  6 | a66  | b666 | c6666 |
+----+------+------+-------+
5 rows in set (0.00 sec)

这个表就是原始数据。

初始化数据

truncate table test_upsert_20230429_res;
insert into test_upsert_20230429_res
select * from test_upsert_20230429_src;

查数据:

mysql> select * from test_upsert_20230429_res;
+----+------+------+-------+
| id | col1 | col2 | col3  |
+----+------+------+-------+
|  1 | a1   | b1   | c1    |
|  2 | a2   | b2   | c2    |
|  3 | a3   | b3   | c3    |
|  5 | a55  | b555 | c5555 |
|  6 | a66  | b666 | c6666 |
+----+------+------+-------+
5 rows in set (0.00 sec)

准备DataFrame

val df2: DataFrame = sc.sparkContext.parallelize(
  Seq(
    (1, "a1", "b1", "c1"),
    (2, "a2", "b2", "c2"),
    (3, "a3", "b3", "c3"),
    (4, "a4", "b4", "c4"),
    (5, "a5", "b5", "c5"),
    (6, "a6", "b6", "c6"),
    (7, "a7", "b7", "c7"),
    (8, "a8", "b8", "c8"),
    (9, "a9", "b9", "c9"),
    (5, "a10", "b10", "c10")
  )
).toDF("id", "col1", "col2", "col3")

df2.show()

结果:

+---+----+----+----+
| id|col1|col2|col3|
+---+----+----+----+
|  1|  a1|  b1|  c1|
|  2|  a2|  b2|  c2|
|  3|  a3|  b3|  c3|
|  4|  a4|  b4|  c4|
|  5|  a5|  b5|  c5|
|  6|  a6|  b6|  c6|
|  7|  a7|  b7|  c7|
|  8|  a8|  b8|  c8|
|  9|  a9|  b9|  c9|
|  5| a10| b10| c10|
+---+----+----+----+

执行upsert

df2.write
  .format("jdbc")
  .mode(SaveMode.Append)
  .options(
    Map(
      "url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
      "dbtable" -> "test_upsert_20230429_res",
      "user" -> "root",
      "password" -> "123456",
      "driver" -> "com.mysql.cj.jdbc.Driver"
    )
  )
  .upsert()

结果:

反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法
dialect=MySQLDialect
rddSchema=StructType(StructField(id,IntegerType,false),StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true))
【CSDN@虎鲸不是鱼】:开始获取UpSert状态
23/04/29 17:10:21 INFO CodeGenerator: Code generated in 20.5899 ms
23/04/29 17:10:21 INFO SparkContext: Starting job: upsert at UpDateDemo1.scala:57
23/04/29 17:10:21 INFO DAGScheduler: Got job 4 (upsert at UpDateDemo1.scala:57) with 8 output partitions
23/04/29 17:10:21 INFO DAGScheduler: Final stage: ResultStage 4 (upsert at UpDateDemo1.scala:57)
23/04/29 17:10:21 INFO DAGScheduler: Parents of final stage: List()
23/04/29 17:10:21 INFO DAGScheduler: Missing parents: List()
23/04/29 17:10:21 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[12] at upsert at UpDateDemo1.scala:57), which has no missing parents
23/04/29 17:10:21 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 32.2 KiB, free 15.8 GiB)
23/04/29 17:10:21 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 13.3 KiB, free 15.8 GiB)
23/04/29 17:10:21 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on DESKTOP-VRV0NDO:54939 (size: 13.3 KiB, free: 15.8 GiB)
23/04/29 17:10:21 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1513
23/04/29 17:10:21 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 4 (MapPartitionsRDD[12] at upsert at UpDateDemo1.scala:57) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
23/04/29 17:10:21 INFO TaskSchedulerImpl: Adding task set 4.0 with 8 tasks resource profile 0
23/04/29 17:10:21 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 9) (DESKTOP-VRV0NDO, executor driver, partition 0, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 10) (DESKTOP-VRV0NDO, executor driver, partition 1, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID 11) (DESKTOP-VRV0NDO, executor driver, partition 2, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 3.0 in stage 4.0 (TID 12) (DESKTOP-VRV0NDO, executor driver, partition 3, PROCESS_LOCAL, 4565 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 4.0 in stage 4.0 (TID 13) (DESKTOP-VRV0NDO, executor driver, partition 4, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 5.0 in stage 4.0 (TID 14) (DESKTOP-VRV0NDO, executor driver, partition 5, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 6.0 in stage 4.0 (TID 15) (DESKTOP-VRV0NDO, executor driver, partition 6, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 7.0 in stage 4.0 (TID 16) (DESKTOP-VRV0NDO, executor driver, partition 7, PROCESS_LOCAL, 4573 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO Executor: Running task 0.0 in stage 4.0 (TID 9)
23/04/29 17:10:21 INFO Executor: Running task 1.0 in stage 4.0 (TID 10)
23/04/29 17:10:21 INFO Executor: Running task 3.0 in stage 4.0 (TID 12)
23/04/29 17:10:21 INFO Executor: Running task 2.0 in stage 4.0 (TID 11)
23/04/29 17:10:21 INFO Executor: Running task 4.0 in stage 4.0 (TID 13)
23/04/29 17:10:21 INFO Executor: Running task 5.0 in stage 4.0 (TID 14)
23/04/29 17:10:21 INFO Executor: Running task 6.0 in stage 4.0 (TID 15)
23/04/29 17:10:21 INFO Executor: Running task 7.0 in stage 4.0 (TID 16)
23/04/29 17:10:21 INFO CodeGenerator: Code generated in 12.8791 ms
23/04/29 17:10:21 INFO Executor: Finished task 0.0 in stage 4.0 (TID 9). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 1.0 in stage 4.0 (TID 10). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 3.0 in stage 4.0 (TID 12). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 4.0 in stage 4.0 (TID 13). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 2.0 in stage 4.0 (TID 11). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 5.0 in stage 4.0 (TID 14). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 6.0 in stage 4.0 (TID 15). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 7.0 in stage 4.0 (TID 16). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 12) in 208 ms on DESKTOP-VRV0NDO (executor driver) (1/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 11) in 208 ms on DESKTOP-VRV0NDO (executor driver) (2/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 5.0 in stage 4.0 (TID 14) in 207 ms on DESKTOP-VRV0NDO (executor driver) (3/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 6.0 in stage 4.0 (TID 15) in 207 ms on DESKTOP-VRV0NDO (executor driver) (4/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 4.0 in stage 4.0 (TID 13) in 208 ms on DESKTOP-VRV0NDO (executor driver) (5/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 10) in 211 ms on DESKTOP-VRV0NDO (executor driver) (6/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 9) in 212 ms on DESKTOP-VRV0NDO (executor driver) (7/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 7.0 in stage 4.0 (TID 16) in 206 ms on DESKTOP-VRV0NDO (executor driver) (8/8)
23/04/29 17:10:21 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
23/04/29 17:10:21 INFO DAGScheduler: ResultStage 4 (upsert at UpDateDemo1.scala:57) finished in 0.269 s
23/04/29 17:10:21 INFO DAGScheduler: Job 4 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/29 17:10:21 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
23/04/29 17:10:21 INFO DAGScheduler: Job 4 finished: upsert at UpDateDemo1.scala:57, took 0.277371 s
23/04/29 17:10:21 INFO SparkContext: Invoking stop() from shutdown hook
23/04/29 17:10:22 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-VRV0NDO:4040
23/04/29 17:10:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/04/29 17:10:22 INFO MemoryStore: MemoryStore cleared
23/04/29 17:10:22 INFO BlockManager: BlockManager stopped
23/04/29 17:10:22 INFO BlockManagerMaster: BlockManagerMaster stopped
23/04/29 17:10:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/04/29 17:10:22 INFO SparkContext: Successfully stopped SparkContext
23/04/29 17:10:22 INFO ShutdownHookManager: Shutdown hook called
23/04/29 17:10:22 INFO ShutdownHookManager: Deleting directory C:\Users\zhiyong\AppData\Local\Temp\spark-2ae1f427-ec0e-4e23-9b59-31b796d980d2

Process finished with exit code 0

貌似成功吊起了重写的类和方法。

查看结果

mysql> select * from test_upsert_20230429_res;
+----+------+------+------+
| id | col1 | col2 | col3 |
+----+------+------+------+
|  1 | a1   | b1   | c1   |
|  2 | a2   | b2   | c2   |
|  3 | a3   | b3   | c3   |
|  4 | a4   | b4   | c4   |
|  5 | a5   | b5   | c5   |
|  6 | a6   | b6   | c6   |
|  7 | a7   | b7   | c7   |
|  8 | a8   | b8   | c8   |
|  9 | a9   | b9   | c9   |
+----+------+------+------+
9 rows in set (0.00 sec)

再次验证

显然upsert成功。但是重复数据保留的是第一条。按照预期应该是保留最后的几条数据,所以再次运行和查数:

mysql> select * from test_upsert_20230429_res;
+----+------+------+------+
| id | col1 | col2 | col3 |
+----+------+------+------+
|  1 | a1   | b1   | c1   |
|  2 | a2   | b2   | c2   |
|  3 | a3   | b3   | c3   |
|  4 | a4   | b4   | c4   |
|  5 | a10  | b10  | c10  |
|  6 | a6   | b6   | c6   |
|  7 | a7   | b7   | c7   |
|  8 | a8   | b8   | c8   |
|  9 | a9   | b9   | c9   |
+----+------+------+------+
9 rows in set (0.00 sec)

显然是分布式运算时顺序错乱导致的。貌似Spark中一个partition最后到达的数据具有随机性,不像DataStage单线程的那种模式遵循先来后到。

这样就实现了对MySQL的upsert操作。对Oracle等RDBMS也可以参照着搞。

尾言

古人用DataStage实现的效果,一定是可以写代码实现。对Spark做二次开发有时候需要用Scala,要求不低。

如果是用Flink流式处理:https://lizhiyong.blog.csdn.net/article/details/124161096

由于可以Java写Sink,不管是先select检测有无数据再分情况做insert和update还是直接拼upsert,难度都要低不少。

并且这么做不需要像纯HQL那样写很长的SQL,各种join,太长了解析AST难免报错,还得手动拆分SQL或者中间落盘。而且Tez的性能并不如Spark。。。这些常见的情况,都应该平台化以减少重复的工作量才能降本增效。。。

SQL Boy写SQL是因为他们只会写SQL,没有任何选择的余地。。。而真·大数据开发攻城狮写SQL单纯是因为领导阶级规定了只允许写SQL,而且写SQL比Java/Scala下班早。。。必要的时候有多种方式实现。

愿天下不再有肤浅的SQL Boy。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130442316

【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作文章来源地址https://www.toymoban.com/news/detail-438158.html

到了这里,关于【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java语言在Spark3.2.4集群中使用Spark MLlib库完成朴素贝叶斯分类器

    贝叶斯定理是关于随机事件A和B的条件概率,生活中,我们可能很容易知道P(A|B),但是我需要求解P(B|A),学习了贝叶斯定理,就可以解决这类问题,计算公式如下:     P(A)是A的先验概率 P(B)是B的先验概率 P(A|B)是A的后验概率(已经知道B发生过了) P(B|A)是

    2023年04月12日
    浏览(32)
  • 【五一创作】网络协议与攻击模拟-01-wireshark使用-捕获过滤器

    网络接口层(没有特定的协议)PPPOE 物理层 数据链路层 网络层:IP (v4/v6) ARP (地址解析协议) RARP ICMP (Internet控制报文协议) IGMP 传输层:TCP(传输控制协议) UDP(用户数据报协议) 应用层:都是基于传输层协议的端口,总共端口0~65535 0~1023 HTTP—tcp80 HTTPS-----TCP443 DHCP DNS HTTP HTTPS FTP SMTP POP3 I

    2024年02月02日
    浏览(42)
  • 【五一创作】【远程工具】- Tabby 下载、安装、使用、配置【ssh/Serial】-免安装、解压即用

    在远程终端工具中,secureCrt 和 XShell 是两款比较有名的远程工具,但收费。前面文章就介绍过 MobaXterm 和 WindTerm ,这两款远程软件都功能很强大,可以满足我们使用远程软件的大部分需求,但是,有更多的选择总是好的,今天再介绍一个同样呼声很高的自由(free)软件—— Ta

    2024年02月04日
    浏览(50)
  • 【五一创作】Qt quick基础1(包含基本元素Text Image Rectangle的使用)

    最近需要看Qt的代码,虽然之前也接触过Qt,但是当时是使用可推拽式的ui设计,虽然体验很好,但是需要看的代码使用的是Qt quick,正好顺便学习一下,记录在此,以便之后忘记作回顾之用。 Qt中设计UI的拖拽式的Widget,例如使用Qt Designer创建的窗口和控件等,这些Widget提供了

    2024年02月06日
    浏览(41)
  • 2023_Spark_实验三:基于IDEA开发Scala例子

    一、创建一个空项目,作为整个项目的基本框架 二、创建SparkStudy模块,用于学习基本的Spark基础 三、创建项目结构 1、在SparkStudy模块下的pom.xml文件中加入对应的依赖,并等待依赖包下载完毕。 在pom.xml文件中加入对应的依赖 等待依赖包下载完毕 2、若不能自动下载依赖包,

    2024年02月10日
    浏览(36)
  • 用idea工具scala 和 Java开发 spark案例:WordCount

    目录 一 环境准备 二 scala代码编写 三 java 代码编写         创建一个 maven 工程         添加下列依赖         原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改                  首先准备好数据,即

    2024年02月07日
    浏览(59)
  • Spark3 新特性之AQE

    一、 背景 Spark 2.x 在遇到有数据倾斜的任务时,需要人为地去优化任务,比较费时费力;如果任务在Reduce阶段,Reduce Task 数据分布参差不齐,会造成各个excutor节点资源利用率不均衡,影响任务的执行效率;Spark 3新特性AQE极大地优化了以上任务的执行效率。 二、 Spark 为什么需

    2024年02月14日
    浏览(34)
  • 2023_Spark_实验六:Scala面向对象部分演示(二)(IDEA开发)

    7、Scala中的apply方法() 遇到如下形式的表达式时,apply方法就会被调用: Object(参数1,参数2,......,参数N) 通常,这样一个apply方法返回的是伴生类的对象;其作用是为了省略new Object的apply方法举例: 8、Scala中的继承 Scala和Java一样,使用extends扩展类。 案例一:

    2024年02月10日
    浏览(51)
  • 2023_Spark_实验五:Scala面向对象部分演示(一)(IDEA开发)

    1、面向对象的基本概念 把数据及对数据的操作方法放在一起,作为一个相互依存的整体——对象,面向 对象的三大特征:  封装  继承  多态 2、类的定义 简单类和无参方法 如果要开发main方法,需要将main方法定义在该类的伴生对象中,即:object对 象中,(后续做详细的讨

    2024年02月10日
    浏览(40)
  • Hive3 on Spark3配置

    大数据组件 版本 Hive 3.1.2 Spark spark-3.0.0-bin-hadoop3.2 OS 版本 MacOS Monterey 12.1 Linux - CentOS 7.6 1)Hive on Spark说明 Hive引擎包括:默认 mr 、 spark 、 Tez 。 Hive on Spark :Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。 Spark on Hive :

    2024年02月04日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包