Spark - 源码分析(二)
小节:Scala中的闭包
本篇开始之前,先说一下闭包的概念,一会要用到,在 Scala 中的写一个闭包是很容易的,甚至比 js 还简单,如下:
object Test {
var base = 3
val myFunc = (i:Int) => i * base
def main(args: Array[String]) {
println(myFunc(5))
}
}
如上的 myFunc 函数依赖了一个外部变量 base,也就是说 base 成为了 myFunc 函数的一部分,因为 myFunc 函数被其它地方引用,所以使得 base 变量也不会被GC回收,这样就形成了一个闭包。
提交Job
上篇中讲到了,假如我们在 RDD 对象上调用了产生宽依赖的算子时,那么该算子将最终会调用 SparkContext 类中的 runJob 方法,本篇中我们接着这个 runJob 方法继续往下看,先回顾下调用 runJob 的地方,以下是RDD类中的一个算子:
...
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
...
参数列表中的 f,就是我们在调用 foreach 的时候,自己写的匿名函数,在这里它被另一个匿名函数引用,这样就形成了一个闭包,最后将这个闭包函数传递给了 runJob 方法。以下是 runJob 方法的原型:
...
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
...
小节:clean()函数的作用
刚开始看到这里的时候,不太明白 val cleanedFunc = clean(func)
这一步是在干嘛,而且很多地方都用到了这个方法,后来上网查找一翻才大概明白,前面说过,这里的 func
是一个闭包,而闭包在执行时会自动捕获它依赖的外部变量,在将闭包函数分发到其它机器上时,会将这个依赖的变量也一并发过去,这一步是在清除一些多余的外部变量,以免浪费集群带宽。
上面的 runJob()
在最后又调用的 DAGScheduler 类中的 runJob 方法,然后又调用 DAGScheduler 的 submitJob 方法,submitJob 会把我们的 RDD 与闭包函数等封装成JobSubmitted 对象,这个对象是 DAGSchedulerEvent 的子类,并将其加入到 eventProcessLoop 对象的 eventQueue 集合中,这个 DAGScheduler 在实例化的时候调用了这样一句调代码:
private[spark] class DAGScheduler(
...
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
...
eventProcessLoop.start()
}
Stage的生成及划分
这个 eventProcessLoop 对象是 EventLoop 的子类,如上 start()
方法会调用父类的 start()
方法,这时会创建一个新线程,循环取出 eventQueue
集合中的 DAGSchedulerEvent 对象,然后交由子类 eventProcessLoop
的 doOnReceive()
方法,根据 DAGSchedulerEvent 不同子类型调用 DAGScheduler 的不同方法,来做出不同的动作,其中包括 Stage 的划分与依赖计算,如下是 doOnReceive()
方法的定义:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
...
handleJobSubmitted()
和 handleMapStageSubmitted()
两个方法会将取出来的Job根据所属的 DAGSchedulerEvent 不同子类,来创建不同的 Stage,handleJobSubmitted()
创建的是 ResultStage,而 handleMapStageSubmitted()
创建 的是 ShuffleMapStage,它们其实都是 Stage 的子类,handleMapStageSubmitted()
创建的是 ShuffleMapStage 类型的的 Stage,这种 Stage 对象中会包含父 Stage 的信息,创建好 Stage 后又会创建一个相应的 ActiveJob,再调用 DAGScheduler 的 submitStage()
方法,此方法定义如下:
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
看出来了吗,这是一个递归方法,不管是哪种 RDD 传进来以后,都会先检查其有没有依赖的父 Stage,如果有父 Stage 且还没有被计算,那么先提交父 Stage,如此递归下去,那么所有被依赖的 Stage 中的最顶层的 Stage 将最先被执行,这时会调起 submitMissingTasks(stage, jobId.get)
这个方法。
这一篇就先分析到这里,下篇继续。