sparkbug解决:Exception in thread “main“ java.lang.UnsupportedOperationException: No Encoder found for

这篇具有很好参考价值的文章主要介绍了sparkbug解决:Exception in thread “main“ java.lang.UnsupportedOperationException: No Encoder found for。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

报错:

23/12/04 14:58:06 INFO ContextCleaner: Cleaned accumulator 12
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- array element class: "org.apache.spark.sql.Row"
- field (class: "scala.collection.Seq", name: "_3")
- array element class: "scala.Tuple3"
- field (class: "scala.collection.Seq", name: "_2")
- root class: "scala.Tuple2"
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:651)

问题原因:

Spark SQL中的Dataset类需要一个编码器来将JVM对象序列化为内部的Spark SQL格式。而对于复杂类型(比如Row或者自定义类),Spark 需要隐式的Encoder,代码因为缺少这样的编码器,所以会导致UnsupportedOperationException异常。

附上代码

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

//https://blog.csdn.net/qq_52128187?type=blog
object Parent_child_V2 {

  case class Node(title: String, key: String, children: Seq[Node])

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ParentChildHierarchy").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val spark = SparkSession.builder.appName("ParentChildHierarchy").getOrCreate()

    import spark.implicits._

    val df1 = sc.textFile("C:\\zzcode\\workplace\\src\\main\\data\\country")

    val schema = StructType(
      Array(
        StructField("Country", StringType, nullable = true),
        StructField("Gender", StringType, nullable = true),
        StructField("Ethnicity", StringType, nullable = true),
        StructField("Race", StringType, nullable = true)
      )
    )

    val rowRDD = df1.map(line => {
      val parts = line.split(",")
      Row(parts(0), parts(1), parts(2), parts(3))
    })

    val df = spark.createDataFrame(rowRDD, schema)

    df.show()

    def toHierarchy(df: Dataset[Row]): Dataset[Node] = {
      import spark.implicits._

      // Generate the leaf nodes for Race under Ethnicity
      val raceDF = df.groupBy("Country", "Gender", "Ethnicity")
        .agg(collect_list($"Race").as("Races"))
        .withColumn("Children", expr("transform(Races, race -> struct(race as title, concat(Country, '-', Gender, '-', Ethnicity, '-', race) as key, array() as children))"))
        .select($"Country", $"Gender", $"Ethnicity", $"Children")

      // Generate the child nodes for Ethnicity under Gender
      val ethnicityDF = raceDF.groupBy("Country", "Gender")
        .agg(collect_list(struct($"Ethnicity".as("title"), concat_ws("-", $"Country", $"Gender", $"Ethnicity").as("key"), $"Children".as("children"))).as("Children"))
        .select($"Country", $"Gender", $"Children")

      // Generate the top-level nodes for Gender under Country
      val countryDF = ethnicityDF.groupBy("Country")
        .agg(collect_list(struct($"Gender".as("title"), concat_ws("-", $"Country", $"Gender").as("key"), $"Children".as("children"))).as("Children"))

      val nodeDS = countryDF.as[(String, Seq[(String, String, Seq[Row])])].map {
        case (country, genderChildren) =>
          val children = genderChildren.map { case (gender, key, childRows) =>
            val childNodes = childRows.map { row =>
              val title = row.getAs[String]("title")
              val key = row.getAs[String]("key")
              val grandChildren = row.getAs[Seq[Row]]("children").map { gcRow =>
                Node(gcRow.getAs[String]("title"), gcRow.getAs[String]("key"), Seq.empty[Node])
              }
              Node(title, key, grandChildren)
            }
            Node(gender, s"$country-$gender", childNodes)
          }
          Node(country, country, children)
      }

      nodeDS
    }

    val resultDS = toHierarchy(df)
    resultDS.show(false)
  }
}

问题解决:文章来源地址https://www.toymoban.com/news/detail-763184.html

  1. 添加Encoders.kryo[Node]作为Node类的编码器。Kryo编码器用于将任意对象序列化到Spark SQL的内部格式。通常对于用户自定义的复杂类型推荐使用Kryo编码器。
  2. toHierarchy函数中使用???标记的地方需要你填写具体的业务逻辑来创建层次结构。
  3. 删除SparkContext的初始化和相关配置,因为通过SparkSession.builder创建SparkSession时,SparkContext也会被初始化。
  4. 编码器Encoders.kryo[Node]在此处仅用作示例,如果想有更好的性能和更小的序列化尺寸,并可以明确控制序列化行为,你需要为自定义类型创建特定的编码器(比如通过使用case class,并让Spark自动生成编码器)。

到了这里,关于sparkbug解决:Exception in thread “main“ java.lang.UnsupportedOperationException: No Encoder found for的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包