Spark(9):RDD的序列化

这篇具有很好参考价值的文章主要介绍了Spark(9):RDD的序列化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0. 相关文章链接

1. 闭包检查

2. 序列化方法和属性

3. Kryo 序列化框架 

4. 核心点总结


0. 相关文章链接

 Spark文章汇总 

1. 闭包检查

        从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变。

2. 序列化方法和属性

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * @ date: 2023/7/4
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
object ModelTest {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("ModelTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建一个RDD
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "12345"))

        //3.1创建一个Search对象
        val search: Search = new Search("hello")

        //3.2 函数传递,打印:ERROR Task not serializable
        search.getMatch1(rdd).collect().foreach(println)

        //3.3 属性传递,运行正常
        // search.getMatch2(rdd).collect().foreach(println)

        //4.关闭连接
        sc.stop()

    }
}


class Search(query: String) {

    def isMatch(s: String): Boolean = {
        s.contains(query)
    }

    // 函数序列化案例
    def getMatch1(rdd: RDD[String]): RDD[String] = {
        rdd.filter(isMatch)
    }

    // 属性序列化案例
    def getMatch2(rdd: RDD[String]): RDD[String] = {
        val q: String = query
        rdd.filter((x: String) => x.contains(q))
    }

}

运行第一个方法进行函数传递时,抛出 ERROR Task not serializable,如下图所示:

Spark(9):RDD的序列化,# Spark,spark,大数据,bigdata

运行第二个方法进行属性传递时,运行正常,打印出正常结果,如下图所示:

Spark(9):RDD的序列化,# Spark,spark,大数据,bigdata

3. Kryo 序列化框架 

参考地址: https://github.com/EsotericSoftware/kryo 

        Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。 


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * @ date: 2023/7/4
 * @ author: yangshibiao
 * @ desc: 项目描述
 */
object ModelTest {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf()
            .setAppName("ModelTest")
            .setMaster("local[*]")
            // 替换默认的序列化机制
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            // 注册需要使用 kryo 序列化的自定义类
            .registerKryoClasses(Array(classOf[Search]))

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建一个RDD
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "12345"))

        //3.1创建一个Search对象
        val search: Search = new Search("hello")

        //3.2 函数传递
        search.getMatch1(rdd).collect().foreach(println)

        //4.关闭连接
        sc.stop()

    }
}


class Search(query: String) extends Serializable {

    def isMatch(s: String): Boolean = {
        s.contains(query)
    }

    // 函数序列化案例
    def getMatch1(rdd: RDD[String]): RDD[String] = {
        rdd.filter(isMatch)
    }

}

运行成功,如下图所示:

Spark(9):RDD的序列化,# Spark,spark,大数据,bigdata

4. 核心点总结

  • 在Spark中,如果有类进行序列化,该类需要继承Serializable,如下图所示:

Spark(9):RDD的序列化,# Spark,spark,大数据,bigdata

  • 在Spark中,Serializable比较重,所以可以使用更优的Kryo框架,但是注意的是即使使用 Kryo 序列化,也要继承 Serializable 接口(如果Spark包中的类均已注册,但如果是自定义的类,还需要手动注册),如下图示所示:

Spark(9):RDD的序列化,# Spark,spark,大数据,bigdata


注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 文章来源地址https://www.toymoban.com/news/detail-521226.html


到了这里,关于Spark(9):RDD的序列化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【高危】Apache Airflow Spark Provider 反序列化漏洞 (CVE-2023-40195)

    zhi.oscs1024.com​​​​​ 漏洞类型 反序列化 发现时间 2023-08-29 漏洞等级 高危 MPS编号 MPS-qkdx-17bc CVE编号 CVE-2023-40195 漏洞影响广度 广 OSCS 描述 Apache Airflow Spark Provider是Apache Airflow项目的一个插件,用于在Airflow中管理和调度Apache Spark作业。 受影响版本中,由于没有对conn_prefix参

    2024年02月10日
    浏览(37)
  • 网络数据通信—ProtoBuf实现序列化和反序列化

    目录 前言 1.环境搭建 2. centos下编写的注意事项 3.约定双端交互接口 4.约定双端交互req/resp 5. 客户端代码实现 6.服务端代码实现 Protobuf还常用于通讯协议、服务端数据交换场景。那么在这个示例中,我们将实现一个网络版本的通讯录,模拟实现客户端与服务端的交互,通过P

    2024年02月04日
    浏览(43)
  • c++ 使用rapidjson对数据序列化和反序列化(vs2109)

      RapidJSON是腾讯开源的一个高效的C++ JSON解析器及生成器,它是只有头文件的C++库,综合性能是最好的。 1. 安装 在NuGet中为项目安装tencent.rapidjson 2. 引用头文件 #include rapidjson/document.h #include rapidjson/memorystream.h #include rapidjson/prettywriter.h 3. 头文件定义 添加测试json字符串和类型

    2024年02月07日
    浏览(36)
  • Spark大数据处理讲课笔记---Spark RDD典型案例

    利用RDD计算总分与平均分 利用RDD统计每日新增用户 利用RDD实现分组排行榜 针对成绩表,计算每个学生总分和平均分   读取成绩文件,生成lines;定义二元组成绩列表;遍历lines,填充二元组成绩列表;基于二元组成绩列表创建RDD;对rdd按键归约得到rdd1,计算总分;将rdd1映射

    2024年02月06日
    浏览(48)
  • Flink 数据序列化

    大家都知道现在大数据生态非常火,大多数技术组件都是运行在 JVM 上的, Flink 也是运行在 JVM 上,基于 JVM 的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临 JVM 的一些问题,比如 Java 对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的

    2024年02月04日
    浏览(75)
  • Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

    Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame: 利用反射机制推断 RDD 模式 使用编程方式定义 RDD 模式 下面使用到的数据 people.txt :         在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。 注意

    2024年02月09日
    浏览(51)
  • Spark RDD编程 文件数据读写

    从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。 示例:读取一个本地文件word.txt val textFile中的textFile是变量名称,sc.t

    2024年02月05日
    浏览(41)
  • 大数据 - Spark系列《六》- RDD详解

    Spark系列文章: 大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客 大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客 大数据

    2024年02月20日
    浏览(44)
  • 【数据结构】二叉树篇|超清晰图解和详解:二叉树的序列化和反序列化

    博主简介: 努力学习的22级计算机科学与技术本科生一枚🌸 博主主页: @是瑶瑶子啦 每日一言🌼: 你不能要求一片海洋,没有风暴,那不是海洋,是泥塘——毕淑敏 🍊 序列化 :本质就是 二叉树的遍历 ,就那么几个:前序、中序、后序、层序。而序列化只不过就是 在遍历到

    2024年02月10日
    浏览(47)
  • 安卓数据存储补充:XML序列化

    序列化是将对象状态转换为可保存或传输的格式的过程。我们可以把对象序列化为不同的格式,比如说:JSon序列化、XML序列化、二进制序列化等等,不同的形式适应不同的业务需求。 把对象的成员变量转化为XML格式,需要使用Xml序列化器(XmlSerializer类),序列化之后的对象

    2024年02月10日
    浏览(83)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包