报错:
23/12/04 14:58:06 INFO ContextCleaner: Cleaned accumulator 12
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- array element class: "org.apache.spark.sql.Row"
- field (class: "scala.collection.Seq", name: "_3")
- array element class: "scala.Tuple3"
- field (class: "scala.collection.Seq", name: "_2")
- root class: "scala.Tuple2"
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:651)
问题原因:
Spark SQL中的Dataset
类需要一个编码器来将JVM对象序列化为内部的Spark SQL格式。而对于复杂类型(比如Row
或者自定义类),Spark 需要隐式的Encoder
,代码因为缺少这样的编码器,所以会导致UnsupportedOperationException
异常。
附上代码文章来源:https://www.toymoban.com/news/detail-763184.html
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
//https://blog.csdn.net/qq_52128187?type=blog
object Parent_child_V2 {
case class Node(title: String, key: String, children: Seq[Node])
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ParentChildHierarchy").setMaster("local[1]")
val sc = new SparkContext(conf)
val spark = SparkSession.builder.appName("ParentChildHierarchy").getOrCreate()
import spark.implicits._
val df1 = sc.textFile("C:\\zzcode\\workplace\\src\\main\\data\\country")
val schema = StructType(
Array(
StructField("Country", StringType, nullable = true),
StructField("Gender", StringType, nullable = true),
StructField("Ethnicity", StringType, nullable = true),
StructField("Race", StringType, nullable = true)
)
)
val rowRDD = df1.map(line => {
val parts = line.split(",")
Row(parts(0), parts(1), parts(2), parts(3))
})
val df = spark.createDataFrame(rowRDD, schema)
df.show()
def toHierarchy(df: Dataset[Row]): Dataset[Node] = {
import spark.implicits._
// Generate the leaf nodes for Race under Ethnicity
val raceDF = df.groupBy("Country", "Gender", "Ethnicity")
.agg(collect_list($"Race").as("Races"))
.withColumn("Children", expr("transform(Races, race -> struct(race as title, concat(Country, '-', Gender, '-', Ethnicity, '-', race) as key, array() as children))"))
.select($"Country", $"Gender", $"Ethnicity", $"Children")
// Generate the child nodes for Ethnicity under Gender
val ethnicityDF = raceDF.groupBy("Country", "Gender")
.agg(collect_list(struct($"Ethnicity".as("title"), concat_ws("-", $"Country", $"Gender", $"Ethnicity").as("key"), $"Children".as("children"))).as("Children"))
.select($"Country", $"Gender", $"Children")
// Generate the top-level nodes for Gender under Country
val countryDF = ethnicityDF.groupBy("Country")
.agg(collect_list(struct($"Gender".as("title"), concat_ws("-", $"Country", $"Gender").as("key"), $"Children".as("children"))).as("Children"))
val nodeDS = countryDF.as[(String, Seq[(String, String, Seq[Row])])].map {
case (country, genderChildren) =>
val children = genderChildren.map { case (gender, key, childRows) =>
val childNodes = childRows.map { row =>
val title = row.getAs[String]("title")
val key = row.getAs[String]("key")
val grandChildren = row.getAs[Seq[Row]]("children").map { gcRow =>
Node(gcRow.getAs[String]("title"), gcRow.getAs[String]("key"), Seq.empty[Node])
}
Node(title, key, grandChildren)
}
Node(gender, s"$country-$gender", childNodes)
}
Node(country, country, children)
}
nodeDS
}
val resultDS = toHierarchy(df)
resultDS.show(false)
}
}
问题解决:文章来源地址https://www.toymoban.com/news/detail-763184.html
- 添加
Encoders.kryo[Node]
作为Node
类的编码器。Kryo编码器用于将任意对象序列化到Spark SQL的内部格式。通常对于用户自定义的复杂类型推荐使用Kryo编码器。 - 在
toHierarchy
函数中使用???
标记的地方需要你填写具体的业务逻辑来创建层次结构。 - 删除
SparkContext
的初始化和相关配置,因为通过SparkSession.builder
创建SparkSession
时,SparkContext也会被初始化。 - 编码器
Encoders.kryo[Node]
在此处仅用作示例,如果想有更好的性能和更小的序列化尺寸,并可以明确控制序列化行为,你需要为自定义类型创建特定的编码器(比如通过使用case class,并让Spark自动生成编码器)。
到了这里,关于sparkbug解决:Exception in thread “main“ java.lang.UnsupportedOperationException: No Encoder found for的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!