Spark - 源码分析(一)
最近抽时间读了下 Spark 源码,最初只是想看看它是怎样跟 HDFS 交互的,每个 task 怎样在它所在的机器上找到及读取数据块的,这一看就是好几天,罢了,那就从头到尾研究一下吧,准备每周更新一篇。在这里记录的只是我所理解的东西,这样一个庞大的软件,如果没有当初的设计图纸是很难梳理清楚其全部原理的,我就看到哪写到哪吧。
Spark简介
Spark 是一个基于内存的大数据处理框架,官方称 Spark 比M apReduce 的处理速度快 10 到 100 倍。它最初在 2009 年由加州大学伯克利分校的 AMPLab 开发,并于 2010 年成为 Apache 的开源项目之一。
Spark版本:2.1.0
部署模式:Spark on Yarn
我们就从提交一个任开始,看看Spark都做了些什么,先写一个简单的Spark任务,如下:
package com.example
import org.apache.spark.{SparkContext,SparkConf}
object SparkCode {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Spark Pi")
.getOrCreate()
spark.sparkContext.textFile("hdfs:///spark/input/file.txt", 3)
.map { x => (x, 1)}
.reduceByKey(_ + _)
.saveAsTextFile("hdfs:///spark/output")
spark.stop()
}
}
将上面代码打成Jar包,然后通过 spark-submit 命令提交这个任务,提交以后它在干什么?
解析命令行
首先,从这个 spark-submit 命令说起,它是一个 shell 脚本,它里面最终调用了 spark-class 这个脚本,并且把所有参数也带过去,这个 spark-class 做了一些环境变量设置,如:JAVA_HOME CLASS-PATH SPARK_HOME 等等,设置好环境变量以后调起了一个Java类:org.apache.spark.launcher.Main
,这个类的作用是分析所有参数合法性,并根据用户提供的指定的集群部署方式决定这个任务的提交方式,拼接出最终执行命令并打印到标准输出流中,这时又回到 spark-class 这个脚本中,它会从标准输出流中捕获到刚才拼接好的最终提交任务命令,这时又调起一个Java类:org.apache.spark.deploy.SparkSubmit
,它又会分析一次用户提供的所有参数,用反射封装用户指定的类,也就是我们上面写的 com.example.SparkCode
类,并找到类中的 main 方法,然后执行它。
初始化SparkSession
进入我们的 main 方法后,先创建了一个 SparkSession 对象,在 Spark2.0 之前是直接创建 SparkContext 对象,在 Spark2.0 之后有了 SparkSession,这个对象中包含了 SparkContext 与 SparkSQL,并提供一些创建 DataFrame 的方法,DataFrame 在 Spark2.0 中也是个比较突出的概念,它又是一个更高级的抽象,可以同时用 RDD 算子和 SQL 语法对其进行处理,好了,接下来介绍重量级的人物:SparkContext
初始化SparkContext
SparkContext 是 Spark 功能的主要入口,即然这么重要,先来看看它包含了什么,下面列出它的几个重要成员:
package org.apache.spark
class SparkContext(config: SparkConf) extends Logging {
private var _conf: SparkConf = _
private var _env: SparkEnv = _
private var _jobProgressListener: JobProgressListener = _
private var _executorMemory: Int = _
private var _taskScheduler: TaskScheduler = _
private var _schedulerBackend: SchedulerBackend = _
private var _applicationId: String = _
@volatile private var _dagScheduler: DAGScheduler = _
...
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
...
除此之外,它还包含各种创建 RDD 的方法,比如我们常用的 testFile()
等,看到这些就知道它的重要性了,当 SparkContext 被创建时,这些成员也会被创建出来,来简单说说这些成员的作用:
_dagScheduler
:它的作用是根据各 RDD 间的依赖情况整个 Job 切分成不同的 Stage,它包含了两个集合:
class DAGScheduler(
...
val jobIdToStageIds = new HashMap[Int, HashSet[Int]] //这个用来存放每个job对应哪些Stage
val stageIdToStage = new HashMap[Int, Stage] //这个存放具体的Stage对象
...
_applicationId
:看到这个成员,应该知道一个 SparkContext 对象就对应用户提交的一个 Application
_taskScheduler
:它来决定哪些 tash 跑在哪个 Executor 上
_schedulerBackend
:它于 _taskScheduler
配合,一起完成 task 分发工作。
其中 _taskScheduler
和 _schedulerBackend
的初始化与用户指定的 masterUrl 有关,见如下代码:
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
...
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
提交Job
SparkContext 初始化完成后就到了我们加载的 hdfs 文件这部分,textFile()` 方法是在 SparkContext 内定义的,它最终创建了一个 HadoopFile 对象:
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
...
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
这个HadoopFile类继承了 RDD 类,也就是说我们调用的 textFile()
这个方法最终返回了一个 RDD, 下一步我们又调用了这个 RDD 的几个算子方法,我们看看 RDD 里有哪些方法:
abstract class RDD[T: ClassTag](
..
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
...
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
...
private[spark] def collectPartitions(): Array[Array[T]] = withScope {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
...
看到 sc.runJob()
了吗,是的,它就是之前在初始化 SparkContext 小节所提到的 runJob 方法,至于为什么有的算子调用了 sc.runJob()
而有的算子却没调用,这就和 Spark 中的宽依赖和窄依赖有关了,窄依赖的算子不会触发实际的计算任务而只会被记录下来。
今天就先写到这里,下一篇接着 runJob()
这个方法继续分析。