使用Akka的Actor模拟Spark的Master和Worker工作机制

这篇具有很好参考价值的文章主要介绍了使用Akka的Actor模拟Spark的Master和Worker工作机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Akka的Actor模拟Spark的Master和Worker工作机制

Spark的Master和Worker协调工作原理

在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:

  1. Worker 启动后,会向预先配置的 Master 节点发送注册请求。
  2. Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
  3. Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
  4. Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
  5. Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
  6. Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
  7. 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。

具体原理如下:

  • Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
  • Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
  • 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
  • Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。

通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。

使用Akka的Actor模拟Spark的Master和Worker工作机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功。
  2. worker定时发送心跳,并在Master接收到。
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
  5. master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
  • 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。

代码实现:

class SparkMaster extends Actor {
  //定义个hashMap,管理workers(所有worker的实例)
  val workers = mutable.Map[String, WorkerInfo]()

  override def receive: Receive = {

    case "start" => {
      println("master服务器启动了...")
      //这里开始。。
      self ! StartTimeOutWorker
    }
    case RegisterWorkerInfo(id, cpu, ram) => {
      //接收到worker注册信息
      if (!workers.contains(id)) {
        //创建WorkerInfo 对象
        val workerInfo = new WorkerInfo(id, cpu, ram)
        //加入到workers
        workers += ((id, workerInfo))
        println("服务器的workers=" + workers)
        //回复一个消息,说注册成功
        sender() ! RegisteredWorkerInfo
      }
    }
    case HeartBeat(id) => {
      //更新对应的worker的心跳时间
      //1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间
      val workerInfo = workers(id)
      workerInfo.lastHeartBeat = System.currentTimeMillis()
      println("master更新了 " + id + " 心跳时间...")
    }
    case StartTimeOutWorker => {
      println("开始了定时检测worker心跳的任务")
      import context.dispatcher
      //说明
      //1. 0 millis 不延时,立即执行定时器
      //2. 9000 millis 表示每隔3秒执行一次
      //3. self:表示发给自己
      //4. RemoveTimeOutWorker 发送的内容
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }
    //对RemoveTimeOutWorker消息处理
    //这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除
    case RemoveTimeOutWorker => {
      //首先将所有的 workers 的 所有WorkerInfo
      val workerInfos = workers.values
      val nowTime = System.currentTimeMillis()
      //先把超时的所有workerInfo,删除即可
      workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000)
        .foreach(workerInfo => workers.remove(workerInfo.id))
      println("当前有 " + workers.size + " 个worker存活的")
    }
  }
}

object SparkMaster {
  def main(args: Array[String]): Unit = {

    //这里我们分析出有3个host,port,sparkMasterActor
    if (args.length != 3) {
      println("请输入参数 host port sparkMasterActor名字")
      sys.exit()
    }

    val host = args(0)
    val port = args(1)
    val name = args(2)

    //先创建ActorSystem
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${host}
         |akka.remote.netty.tcp.port=${port}
            """.stripMargin)
    val sparkMasterSystem = ActorSystem("SparkMaster", config)
    //创建SparkMaster -actor
    val sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")
    //启动SparkMaster
    sparkMasterRef ! "start"
  }
}

  • 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{
  //masterProxy是Master的代理/引用ref
  var masterPorxy :ActorSelection = _
  val id = java.util.UUID.randomUUID().toString

  override def preStart(): Unit = {
    println("preStart()调用")
    //初始化masterPorxy
    masterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")
    println("masterProxy=" + masterPorxy)
  }
  override def receive:Receive = {
    case "start" => {
      println("worker启动了")
      //发出一个注册消息
      masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)
    }
    case RegisteredWorkerInfo => {
      println("workerid= " + id + " 注册成功~")
      //当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己
      import context.dispatcher
      //说明
      //1. 0 millis 不延时,立即执行定时器
      //2. 3000 millis 表示每隔3秒执行一次
      //3. self:表示发给自己
      //4. SendHeartBeat 发送的内容
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)

    }
    case SendHeartBeat =>{
      println("worker = " + id + "给master发送心跳")
      masterPorxy ! HeartBeat(id)
    }
  }
}

object SparkWorker {
  def main(args: Array[String]): Unit = {

    if (args.length != 6) {
      println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")
      sys.exit()
    }

    val workerHost = args(0)
    val workerPort = args(1)
    val workerName = args(2)
    val masterHost = args(3)
    val masterPort = args(4)
    val masterName = args(5)
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${workerHost}
         |akka.remote.netty.tcp.port=${workerPort}
            """.stripMargin)

    //创建ActorSystem
    val sparkWorkerSystem = ActorSystem("SparkWorker",config)

    //创建SparkWorker 的引用/代理
    val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")

    //启动actor
    sparkWorkerRef ! "start"
  }
}

  • 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。

代码如下:

// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)


// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
  var lastHeartBeat : Long = System.currentTimeMillis()
}

// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo

//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)

//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker


运行效果:
使用Akka的Actor模拟Spark的Master和Worker工作机制,大数据,# Scala,spark,大数据,分布式
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。

Spark的wordCount图解

  • 通过图解可以理解RDD对数据和计算逻辑进行封装,RDD的链式操作,数据流转过程。

使用Akka的Actor模拟Spark的Master和Worker工作机制,大数据,# Scala,spark,大数据,分布式

RDD理解

  • RDD的数据处理方式类似于IO流,也有装饰者设计模式。
  • RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前的封装全部都是功能的扩展。
  • RDD是不保存数据的,但是IO可以临时保存一部分数据。

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据
处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行
计算的集合。
➢ 弹性
1)存储的弹性:内存与磁盘的自动切换;
2)错的弹性:数据丢失可以自动恢复;
3)计算的弹性:计算出错重试机制;
4)分片的弹性:可根据需要重新分片。(分片可以理解为分区)

➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD 封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现
➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑。
➢ 可分区、并行计算


假设基于Yarn-Client模式,Driver运行在client客户端主机,这里图解Driver与每个计算节点Executor的协作工作的基本原理:这里假设只有一个RDD。
总结:
RDD是一种封装了计算逻辑的弹性数据集,它是基于内存的多分区并行计算的抽象数据模型。
使用Akka的Actor模拟Spark的Master和Worker工作机制,大数据,# Scala,spark,大数据,分布式
Spark On Yarn-Client 模式
用于监控和调度的 Driver 模块会在客户端本地主机启动,在客户端执行,而不是在 Yarn 中,所以一般用于测试。

➢ Driver 在任务提交的本地机器上运行

➢ Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster

➢ ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存。

➢ ResourceManager 接到 ApplicationMaster 的资源申请后会分配container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程。

➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main 函数。

➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,然后的个数是有每个阶段的最后一个RDD的分区数决定的;之后将 task 分发到各个 Executor 上执行,最终将执行结果输出;如何需要将每个Executor的结果返回到Driver聚合,就需要使用累加器完成了。文章来源地址https://www.toymoban.com/news/detail-701285.html

到了这里,关于使用Akka的Actor模拟Spark的Master和Worker工作机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kubernetes(K8S)学习(一):K8S集群搭建(1 master 2 worker)

    kubernetes官网 :https://kubernetes.io/docs/setup/production-environment/tools/kubeadm/install-kubeadm/#installing-kubeadm-kubelet-and-kubectl GitHub :https://github.com/kubernetes/kubeadm 本文 :使用kubeadm搭建一个3台机器组成的k8s集群,1台master节点,2台worker节点。 由于k8s安装较麻烦,为防止出现其他异常,特此

    2024年04月09日
    浏览(38)
  • 解决Worker 1 failed executing transaction ‘ANONYMOUS‘ at master log mall-mysql-bin.000001, end_log_pos

    在Docker中配置MySQL主从服务器时遇到的问题。 如下错误解决: Coordinator stopped because there were error(s) in the worker(s). The most recent failure being: Worker 1 failed executing transaction ‘ANONYMOUS’ at master log mall-mysql-bin.000001, end_log_pos 2251. See error log and/or performance_schema.replication_applier_status_by_wo

    2023年04月12日
    浏览(61)
  • spark master组件分析

    master只有在local-cluster和standalone部署模式下存在。 Master的职责包括Worker的管理、Application的管理、Driver的管理等。Master负责对整个集群中所有资源的统一管理和分配,它接收各个Worker的注册、更新状态、心跳等消息,也接收Driver和Application的注册。 Worker向Master注册时会携带自

    2024年01月16日
    浏览(22)
  • akka 简单使用

    由于AKka的核心是Actor,而Actor是按照Actor模型进行实现的,所以在使用Akka之前,有必要弄清楚什么是Actor模型。 Actor模型最早是1973年Carl Hewitt、Peter Bishop和Richard Seiger的论文中出现的,受物理学中的广义相对论(general relativity)和量子力学(quantum mechanics)所启发,为解决并发计算的

    2024年02月15日
    浏览(20)
  • Spark---Master启动及Submit任务提交

    Spark集群启动之后,首先调用$SPARK_HOME/sbin/start-all.sh,start-all.sh脚本中调用了“start-master.sh”脚本和“start-slaves.sh”脚本,在start-master.sh脚本中可以看到启动Master角色的主类:“org.apache.spark.deploy.master.Master”。在对应的start-slaves.sh脚本中又调用了start-slave.sh脚本,在star-slave.

    2024年01月20日
    浏览(32)
  • spark grpc 在master运行报错 exitcode13 User did not initialize spark context

    ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: java.lang.IllegalStateException: User did not initialize spark context! 1.使用了不具备权限的用户,spark运行环境有缺失 2.protobuf 需要使用指定操作系统进行编译 未使用 os.detected.classifier=windows-x86_64 或 linux-x86_64,或者把windo

    2024年02月10日
    浏览(23)
  • 【Elasticsearch专栏 16】深入探索:Elasticsearch的Master选举机制及其影响因素分析

    Elasticsearch,作为当今最流行的开源搜索和分析引擎,以其分布式、可扩展和高可用的特性赢得了广大开发者的青睐。在Elasticsearch的分布式架构中,集群的稳健性和高可用性很大程度上依赖于其Master节点的选举机制。本文将深入剖析Elasticsearch的Master选举过程,帮助读者更好地

    2024年04月17日
    浏览(25)
  • uboot下UCLASS框架详解---结合项目工作中spi master和flash驱动开发

    本文通过如何通过编写特定板子的spi master驱动从而识别到spi norflash设备,完成norflash设备的读写。 2.1 uclass uclass可以理解为一些具有相同属性的udevice对外操作的接口,uclass的驱动是uclass_driver,主要为上层提供接口。 udevice的是指具体设备的抽象,对应驱动是driver,driver主要负

    2024年02月07日
    浏览(28)
  • Spark RDD 缓存机制

    Spark RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。 当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接

    2024年03月25日
    浏览(43)
  • Spark on Hive及 Spark SQL的运行机制

    代码中集成Hive: Spark SQL底层依然运行的是Spark RDD的程序,所以说Spark RDD程序的运行的流程,在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程 Spark SQL的运行机制,其实就是在描述如何将Spark SQL翻译为RDD程序 Catalyst内部具体的执行流程: 专业术

    2024年01月23日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包