Spark中python和jvm的通信杂谈--ArrowConverter

这篇具有很好参考价值的文章主要介绍了Spark中python和jvm的通信杂谈--ArrowConverter。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

要提起ArrowConverters,就得说起Arrow这个项目,该项目的初衷是加速进程间的数据交换,从目前的社区发展以及它的周边来看,其实是一个很不错的项目。
那为什么Spark要引入Arrow呢?其实还得从Pyspark中python和jvm的交互方式上说起,目前pyspark采用的py4j与spark jvm进行交互,而数据的交换采用的是jvmpython两个进程间的数据交换(感兴趣的同学可以参考PySpark架构),这个时候引进Arrow恰到好处。

闲说杂谈

spark具体采用的是Arrow IPC,
IPC中用到了flatbuffers这种高效获取序列化数据的组件,再加上IPC采用的是Java NIO的ByteBuffer零拷贝的方式以及RecordBatch列批的方式,大大提升了进程间的数据交换效率。关于NIO的零拷贝参考NIO效率高的原理之零拷贝与直接内存映射

具体细节

直接到ArrowConverters的类中:
主要看两个方法:toBatchIteratorfromBatchIterator

  • ArrowConverters.toBatchIterator
  private[sql] def toBatchIterator(
      rowIter: Iterator[InternalRow],
      schema: StructType,
      maxRecordsPerBatch: Long,
      timeZoneId: String,
      context: TaskContext): ArrowBatchIterator = {
    new ArrowBatchIterator(
      rowIter, schema, maxRecordsPerBatch, timeZoneId, context)
  }

这个主要是把spark内部的InternalRow转换为ArrowRecordBatches,方法直接就是返回ArrowBatchIterator类型(Iterator[Array[Byte]]类型)的迭代器:

  • ArrowConverters.fromBatchIterator
  private[sql] def fromBatchIterator(
      arrowBatchIter: Iterator[Array[Byte]],
      schema: StructType,
      timeZoneId: String,
      context: TaskContext): Iterator[InternalRow] = new InternalRowIteratorWithoutSchema(
    arrowBatchIter, schema, timeZoneId, context
  )

这个主要是把序列化的ArrowRecordBatche转换为Spark内部的InternalRow,这里也是直接返回了InternalRowIteratorWithoutSchema类型的迭代器,这里就涉及到了内存的零拷贝,具体的方法如下:

    override def nextBatch(): (Iterator[InternalRow], StructType) = {
      val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
      val root = VectorSchemaRoot.create(arrowSchema, allocator)
      resources.append(root)
      val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator)
      val vectorLoader = new VectorLoader(root)
      vectorLoader.load(arrowRecordBatch)
      arrowRecordBatch.close()
      (vectorSchemaRootToIter(root), schema)
    }

其中涉及的调用链如下:

ArrowConverters.loadBatch
   ||
   \/
MessageSerializer.deserializeRecordBatch
   ||
   \/
readMessageBody
  ||
  \/
ReadChannel.readFully
  ||
  \/
buffer.nioBuffer
  ||
  \/
getDirectBuffer

最后的getDirectBuffer直接返回的是DirectByteBuffer直接内存,这样可以避免了JVM内存到native内存的数据拷贝,尤其是在大数据场景下,提升的效率更加明显,且减少了用户态和内核态的切换次数。

  • 怎么运用到python与spark jvm的交互中
    调用网上的Pyspark的架构图
    Spark中python和jvm的通信杂谈--ArrowConverter

    参考具体conversion.py中部分代码如下:

    jrdd = self._sc._serialize_to_jvm(arrow_data, ser, reader_func, create_RDD_server)
    jdf = self._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), jsqlContext)
    

    主要在self._jvm.PythonSQLUtils.toDataFrame这个方法中,python调用spark中方法,把序列化的*Iterator[Array[Byte]]*传给jvm执行,具体的细节,读者可以自行参考源代码.

其他

在最新发布的Spark-3.4.0中有一项SPIP,也是采用了Arrow IPC作为数据传输的格式。
当然Arrow Flight SQL也将是一个很好的技术点。文章来源地址https://www.toymoban.com/news/detail-500005.html

到了这里,关于Spark中python和jvm的通信杂谈--ArrowConverter的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Scala第二十章节(Akka并发编程框架、Akka入门案例、Akka定时任务代码实现、两个进程间通信的案例以及简易版spark通信框架案例)

    章节目标 理解Akka并发编程框架简介 掌握Akka入门案例 掌握Akka定时任务代码实现 掌握两个进程间通信的案例 掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库,

    2024年04月11日
    浏览(45)
  • python连接spark报错【已解决】

    错误:raise RuntimeError(\\\"Java gateway process exited before sending its port number\\\") RuntimeError: Java gateway process exited before sending its port number 通过cmd安装的spark, 在pycharm运行的 经过尝试,找到了解决办法 下载JDK!!!也就是java 下完之后我给java在电脑配置了下环境变量,今天再用pycharm尝试就可以

    2024年02月08日
    浏览(32)
  • Spark使用Python开发和RDD

    在所有节点上按照python3,版本必须是python3.6及以上版本 修改所有节点的环境变量 在pyspark shell使用python编写wordcount RDD的全称为Resilient Distributed Dataset,是一个弹性、可复原的分布式数据集,是Spark中最基本的抽象,是一个不可变的、有多个分区的、可以并行计算的集合。RDD中

    2024年02月11日
    浏览(38)
  • 【8章】Spark编程基础(Python版)

    课程资源: (林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili 机器学习算法库 1、机器学习 机器学习可以看做是一门人工智能的科学,该领域的主要研究对象是人工智能。机器学习利用数据或以往的经验,优化计算机程序的性能标准。强调三个:算法、经验、性能 模

    2024年02月10日
    浏览(49)
  • Python黑马程序员(Spark实战)笔记

     pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark 注意:字符串返回的是[\\\'a\\\',\\\'b\\\',\\\'c\\\',\\\'d\\\',\\\'e\\\',\\\'f\\\',\\\'g\\\']   字典返回的是[\\\'key1\\\',\\\'key2\\\']   读取hello.txt的内容: 注意: 如果没有添加上行代码程序会报出错误! Caused by: org.apache.spark.SparkException: Python worker failed to connect back.  解释器的位置

    2024年02月05日
    浏览(87)
  • 【1-3章】Spark编程基础(Python版)

    课程资源: (林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili 第三次信息化浪潮:以物联网、云计算、大数据为标志 大数据时代到来的原因: 技术支撑: 存储设备(价格下降)、CPU计算能力(多核CPU)、网络带宽(单机不能够完成海量数据的存储和处理,借助网络分布式的

    2024年02月11日
    浏览(48)
  • 手机app测试杂谈

    目录   一、手机上的app分类 1、基于 HTML5 的 app 2、本地 app 二、测试

    2024年02月11日
    浏览(33)
  • 网络安全威胁杂谈

    网络安全发展到现在,安全最大的威胁是什么?答案:漏洞、恶意软件 全球发生的大多数的网络安全事件,基本都是由于漏洞、恶意软件发起攻击的,可以说如果解决这两类问题,基本上可以解决大多数的网络安全问题,然而为啥现在网络安全事件越来越多,问题越来越多,

    2024年02月21日
    浏览(52)
  • stable diffusion(杂谈)

    图像生成器# information creator 完全在图像信息空间(或潜伏空间)中工作。这一特性使它比以前在像素空间工作的扩散模型更快。在技术上,这个组件是由一个 UNet 神经网络和一个调度算法组成的。 Text Encoder# 提示词的解析由 Text Encoder/CLIP 处理 (token embedding),这里是提示词转

    2024年02月09日
    浏览(30)
  • 低代码技术杂谈

    “Low-Code”是什么?身为技术人员听到这种技术名词,咱们第一反应就是翻看维基百科 或者其他相关技术论文,咱们想看维基百科的英文介绍: A low-code development platform (LCDP) provides a development environment used to create application software through a graphical user interface inste

    2024年01月23日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包