所有文章

Spark - 源码分析(二)

小节:Scala中的闭包

本篇开始之前,先说一下闭包的概念,一会要用到,在 Scala 中的写一个闭包是很容易的,甚至比 js 还简单,如下:

object Test {  
   var base = 3 
   val myFunc = (i:Int) => i * base  
   def main(args: Array[String]) {  
      println(myFunc(5))  
   }  
}

如上的 myFunc 函数依赖了一个外部变量 base,也就是说 base 成为了 myFunc 函数的一部分,因为 myFunc 函数被其它地方引用,所以使得 base 变量也不会被GC回收,这样就形成了一个闭包。

提交Job

上篇中讲到了,假如我们在 RDD 对象上调用了产生宽依赖的算子时,那么该算子将最终会调用 SparkContext 类中的 runJob 方法,本篇中我们接着这个 runJob 方法继续往下看,先回顾下调用 runJob 的地方,以下是RDD类中的一个算子:

...
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }
...

参数列表中的 f,就是我们在调用 foreach 的时候,自己写的匿名函数,在这里它被另一个匿名函数引用,这样就形成了一个闭包,最后将这个闭包函数传递给了 runJob 方法。以下是 runJob 方法的原型:

...
  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }
 
...

小节:clean()函数的作用

刚开始看到这里的时候,不太明白 val cleanedFunc = clean(func) 这一步是在干嘛,而且很多地方都用到了这个方法,后来上网查找一翻才大概明白,前面说过,这里的 func 是一个闭包,而闭包在执行时会自动捕获它依赖的外部变量,在将闭包函数分发到其它机器上时,会将这个依赖的变量也一并发过去,这一步是在清除一些多余的外部变量,以免浪费集群带宽。

上面的 runJob() 在最后又调用的 DAGScheduler 类中的 runJob 方法,然后又调用 DAGScheduler 的 submitJob 方法,submitJob 会把我们的 RDD 与闭包函数等封装成JobSubmitted 对象,这个对象是 DAGSchedulerEvent 的子类,并将其加入到 eventProcessLoop 对象的 eventQueue 集合中,这个 DAGScheduler 在实例化的时候调用了这样一句调代码:

private[spark] class DAGScheduler(
...
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
...
  eventProcessLoop.start()
}

Stage的生成及划分

这个 eventProcessLoop 对象是 EventLoop 的子类,如上 start() 方法会调用父类的 start() 方法,这时会创建一个新线程,循环取出 eventQueue 集合中的 DAGSchedulerEvent 对象,然后交由子类 eventProcessLoopdoOnReceive() 方法,根据 DAGSchedulerEvent 不同子类型调用 DAGScheduler 的不同方法,来做出不同的动作,其中包括 Stage 的划分与依赖计算,如下是 doOnReceive() 方法的定义:

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
 
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
 
    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)
 
    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)
 
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
...

handleJobSubmitted()handleMapStageSubmitted() 两个方法会将取出来的Job根据所属的 DAGSchedulerEvent 不同子类,来创建不同的 Stage,handleJobSubmitted() 创建的是 ResultStage,而 handleMapStageSubmitted() 创建 的是 ShuffleMapStage,它们其实都是 Stage 的子类,handleMapStageSubmitted() 创建的是 ShuffleMapStage 类型的的 Stage,这种 Stage 对象中会包含父 Stage 的信息,创建好 Stage 后又会创建一个相应的 ActiveJob,再调用 DAGScheduler 的 submitStage() 方法,此方法定义如下:

private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

看出来了吗,这是一个递归方法,不管是哪种 RDD 传进来以后,都会先检查其有没有依赖的父 Stage,如果有父 Stage 且还没有被计算,那么先提交父 Stage,如此递归下去,那么所有被依赖的 Stage 中的最顶层的 Stage 将最先被执行,这时会调起 submitMissingTasks(stage, jobId.get) 这个方法。

这一篇就先分析到这里,下篇继续。


编写日期:2017-04-22