所有文章

Spark - 源码分析(一)

最近抽时间读了下 Spark 源码,最初只是想看看它是怎样跟 HDFS 交互的,每个 task 怎样在它所在的机器上找到及读取数据块的,这一看就是好几天,罢了,那就从头到尾研究一下吧,准备每周更新一篇。在这里记录的只是我所理解的东西,这样一个庞大的软件,如果没有当初的设计图纸是很难梳理清楚其全部原理的,我就看到哪写到哪吧。

Spark简介

Spark 是一个基于内存的大数据处理框架,官方称 Spark 比M apReduce 的处理速度快 10 到 100 倍。它最初在 2009 年由加州大学伯克利分校的 AMPLab 开发,并于 2010 年成为 Apache 的开源项目之一。

Spark版本:2.1.0

部署模式:Spark on Yarn

我们就从提交一个任开始,看看Spark都做了些什么,先写一个简单的Spark任务,如下:

package com.example
 
import org.apache.spark.{SparkContext,SparkConf}
 
object SparkCode {
   
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
     
    spark.sparkContext.textFile("hdfs:///spark/input/file.txt", 3)
      .map { x => (x, 1)}
      .reduceByKey(_ + _)
      .saveAsTextFile("hdfs:///spark/output")
     
    spark.stop()
  }
   
}

将上面代码打成Jar包,然后通过 spark-submit 命令提交这个任务,提交以后它在干什么?

解析命令行

首先,从这个 spark-submit 命令说起,它是一个 shell 脚本,它里面最终调用了 spark-class 这个脚本,并且把所有参数也带过去,这个 spark-class 做了一些环境变量设置,如:JAVA_HOME CLASS-PATH SPARK_HOME 等等,设置好环境变量以后调起了一个Java类:org.apache.spark.launcher.Main,这个类的作用是分析所有参数合法性,并根据用户提供的指定的集群部署方式决定这个任务的提交方式,拼接出最终执行命令并打印到标准输出流中,这时又回到 spark-class 这个脚本中,它会从标准输出流中捕获到刚才拼接好的最终提交任务命令,这时又调起一个Java类:org.apache.spark.deploy.SparkSubmit,它又会分析一次用户提供的所有参数,用反射封装用户指定的类,也就是我们上面写的 com.example.SparkCode 类,并找到类中的 main 方法,然后执行它。

初始化SparkSession

进入我们的 main 方法后,先创建了一个 SparkSession 对象,在 Spark2.0 之前是直接创建 SparkContext 对象,在 Spark2.0 之后有了 SparkSession,这个对象中包含了 SparkContext 与 SparkSQL,并提供一些创建 DataFrame 的方法,DataFrame 在 Spark2.0 中也是个比较突出的概念,它又是一个更高级的抽象,可以同时用 RDD 算子和 SQL 语法对其进行处理,好了,接下来介绍重量级的人物:SparkContext

初始化SparkContext

SparkContext 是 Spark 功能的主要入口,即然这么重要,先来看看它包含了什么,下面列出它的几个重要成员:

package org.apache.spark
 
class SparkContext(config: SparkConf) extends Logging {
  private var _conf: SparkConf = _
  private var _env: SparkEnv = _
  private var _jobProgressListener: JobProgressListener = _
  private var _executorMemory: Int = _
  private var _taskScheduler: TaskScheduler = _
  private var _schedulerBackend: SchedulerBackend = _
  private var _applicationId: String = _
  @volatile private var _dagScheduler: DAGScheduler = _
  ...
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }
...

除此之外,它还包含各种创建 RDD 的方法,比如我们常用的 testFile() 等,看到这些就知道它的重要性了,当 SparkContext 被创建时,这些成员也会被创建出来,来简单说说这些成员的作用:

_dagScheduler:它的作用是根据各 RDD 间的依赖情况整个 Job 切分成不同的 Stage,它包含了两个集合:

class DAGScheduler(
...
  val jobIdToStageIds = new HashMap[Int, HashSet[Int]]  //这个用来存放每个job对应哪些Stage
  val stageIdToStage = new HashMap[Int, Stage]          //这个存放具体的Stage对象
...

_applicationId:看到这个成员,应该知道一个 SparkContext 对象就对应用户提交的一个 Application

_taskScheduler:它来决定哪些 tash 跑在哪个 Executor 上

_schedulerBackend:它于 _taskScheduler 配合,一起完成 task 分发工作。

其中 _taskScheduler_schedulerBackend 的初始化与用户指定的 masterUrl 有关,见如下代码:

private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._
 
    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1
 
    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)
...
      case masterUrl =>
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")

提交Job

SparkContext 初始化完成后就到了我们加载的 hdfs 文件这部分,textFile()` 方法是在 SparkContext 内定义的,它最终创建了一个 HadoopFile 对象:

def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
...
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

这个HadoopFile类继承了 RDD 类,也就是说我们调用的 textFile() 这个方法最终返回了一个 RDD, 下一步我们又调用了这个 RDD 的几个算子方法,我们看看 RDD 里有哪些方法:

abstract class RDD[T: ClassTag](
..
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
...
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }
...
  private[spark] def collectPartitions(): Array[Array[T]] = withScope {
    sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  }
...

看到 sc.runJob() 了吗,是的,它就是之前在初始化 SparkContext 小节所提到的 runJob 方法,至于为什么有的算子调用了 sc.runJob() 而有的算子却没调用,这就和 Spark 中的宽依赖和窄依赖有关了,窄依赖的算子不会触发实际的计算任务而只会被记录下来。

今天就先写到这里,下一篇接着 runJob() 这个方法继续分析。


编写日期:2017-04-16