所有文章

Spark on Yarn - 性能调优

自从领导让我接手了 Docker 方面的工作,虽然每天还是会跟大数据接触,但主要精力已转到了 Docker 上,感觉自己离大数据越来越远了,今天写一篇关于 Spark 调优的文章,也算是复习一下。

环境描述

Spark 有多种运行模式,本次我们以 yarn-cluster 模式来运行,环境如下: Spark 版本:2.0.0 Spark 运行模式:yarn-cluster 计算节点数:10 单节点内存大小:64g 单节点 CPU 核数:16


Yarn配置

即然用 Yarn 来管理资源,那么 Spark 最终运行性能也会受 Yarn 的影向,我们先来说明 Yanr 中几个关键配置参数。

yarn.nodemanager.resource.memory-mb = 61440: 这个参数指每个(假设我们所有节点配置都是一样的)物理机要供献出多少内存给 Yarn,每个节点有 64G,减去给系统预留的 2G,NodeManager 和 DataNode 各占用 1G,如果没有其它服务的话,还剩下 60G,最终得出 60 * 1024 = 61440 , 这只是个参考,实际上在系统开机以后已经没有 64G 了,而且每个节点上还不止这两个服务,如 Hbase、impala、logstash 等等,所以还要考虑这些服务所占用的资源。

yarn.nodemanager.resource.cpu-vcores = 15: 这个值指单个节点要供献出多少CPU给Ynar来调度,同样要减去系统与其它服务所占用的核数,我们就以15为例了。 另外关于这个值还有个说法,比如物理机是12核,减去其它服务占用的2个,那么在配置里面写的时候就是15 * 1.5 = 22 或者 10 * 2 = 30,而我试过以后,觉得性能并没有明显变化。 这时,Yarn的总资源: 总内存:61440M * 10个节点 = 614400M 总核数:15核 * 10个节点 = 150核

yarn.scheduler.maximum-allocation-mb = 61440 = 60G: 这个值用来限制Yanr中的每个container可使用的最大内存,应该设为小于或等于单个节点的内存大小,也就是 60G yarn.scheduler.minimum-allocation-mb = 512 这个值用来限制 Yanr 中的每个 container 可使用的最小内存,一般为 512 或 1G,太大会造成资源浪费 yarn.scheduler.maximum-allocation-vcores = 15 这个值用来限制 Yanr 中的每个 container 可使用的最大 CPU 核数,应该设为小于或等于单个节点的核数,也就是15。

yarn.scheduler.minimum-allocation-vcores = 1: 这个值用来限制 Yanr 中的每个 container 可使用的最小 CPU 核数,就给1个吧。

yarn.nodemanager.resource.percentage-physical-cpu-limit = 100: 这个值是用来限制 Yarn 容器可使用的所有 CPU 的百分比,上面写的是 physical,但官没说这个百分比是针对物理机还是我们给 Yarn 分配的 10 核,总之,如果你的这个值不是 100 的话,就把它设为 100 吧。

Spark配置

提交任务时,Spark默认会读取 conf/spark-defaults.conf 内的参数,下面一个示例配置:

[user@node1 spark]$ vi conf/spark-defaults.conf 
spark.master yarn-cluster
spark.shuffle.service.enabled true
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///spark-history/
spark.executor.instances 26
spark.executor.memory 11G 
spark.executor.cores 2  
spark.default.parallelism 60
spark.driver.memory 25g 
spark.driver.cores 5
spark.dynamicAllocation.enabled false

我们先来说明一下它的运行原理,下面是一个示例任务:

bin/spark-submit \
--class com.clean.MyClass \
lib/clean-data.jar \
/input/data.txt /output/
...

当我们提交一个 Spark 任务后,它会在各个节点上分配多个 Executor,Executor 的数量可以由用户指定,然后用户提交的任务被分解成多个 Job,每全 Job 划分成多个 Stage,每个 Stage 中包括很多 Task,如果你启动了 Spark-History-Server,可以通这 WEB UI 看到它们的执行过程。 上面配置文件的 spark.dynamicAllocation.enabled 这项参数很关键,如果指定为 true,用户可以省去 spark.executor.instances 这项配置,Spark 将自动分配 Executor 数量,大多数情况下它可以较好地执行,但它会用最保守的方案把任务执行完,性能并不是最优的,而一般我们在写一个 Spark 程序时是知道数据量有多大的,因此有时候我们需要自己指定 Executor 数据与它们可使用的资源。

参数说明

如果我们要处理一个100g的文件,可以怎样规划计算资源?上面配置中有6个重要的参数,我们一个一个说明。

spark.executor.instances: 首先共有10台机器,所以它的值不能小于10,这样才能让所有机器并行跑起来,其次每台机器有15个核,那么每台机器上的 Executor 数量不能超过 15。

spark.executor.cores:每个 Executor 会执行很多 Task,如果只给每个 Executor 分配一个核,那这个 Executor 内所有的 Task 就只能一个一个执行了,所以这个值决定了每个 Executor 的计算能力,每个节点有 15 个核,如果这节点上有 2 个 Executor,那每个 Executor 最多只能给 7 个核了。

spark.executor.memory:这个值跟上面同理,根据 Executor 数量决定,与 spark.executor.cores 也有关系,假设我们给每个 Executor 分配了两个核,而内存却给了 100G,也就是说两个 Task 每人可以分到 50G 的内存!那结果只有少部分内存会被利用起来。

以个人经验,每个 Task 平均分到的内存应控制在 3G 到 10G 之间为好,太小不能发挥集群性能,也有可能 OOM,而太大无疑会降低任务整体的并行度,我们就先设为 5G。

参数取值

如上,我们设每个 Task 平均到的内存为 5G,所以单节点上可以同时跑的 Task 数量为 60G / 5G = 12 个,这又意味着单节点有 12 个核被利用起来,而单节点核数 15 个,看起来已经接近最优了,因为每个节点的内存跟 CPU 都用到了大部分,上面说过每个节点上的 Executor 不能超过 15,且每个 Executor 的核至少为 2,如果每个节点 3 个 Executor,每个 Executor 分配 4 个核,3 * 4 = 12,跟上面的结论接近。

spark.executor.instances:的值就等于 3 * 10节点 - 1 = 29,为什么减掉一个呢,因为还有一个 SparkDriver,它也是一个 Executor,所以 29 加上一个 Driver 刚好为 30 个 Executor,可以均匀分布在 10 个节点上。

spark.executor.cores:的值就等于上面说的 4。

spark.executor.memory:的值等于 60G / 3个Executor = 20G

spark.driver.memory:上面算出来每个 Executor 的内存为 20G,因为已经没有多余的内存,所以它最多也只有 20G 了。

spark.driver.cores:同理,它也是 4,现在我们还有 150 - 4 * 30 = 30 个核,也可以多给它一些。

spark.default.parallelism:这个指所有任务的并行度,它的值应该给到所有 Executor 的两到三倍,30 * 2 = 60

当然上面这 6 项参数也可以在自己的 Spark 程序中指定,还可以在提交任务时指定,这样当执行不同的任务就可以使用不同的配置,现在用上面计算出来的值来提交任务:

bin/spark-submit \
--num-executors 29 \
--executor-cores 4 \
--executor-memory 20G \
--driver-memory 20 \
--driver-cores 4 \
--class com.clean.MyClass \
lib/clean-data.jar \
/input/data.txt /output

-End-


编写日期:2017-03-25