spark 源码分析之十九 -- DAG的生成和Stage的划分

2021年11月25日 阅读数:4
这篇文章主要向大家介绍spark 源码分析之十九 -- DAG的生成和Stage的划分,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系。从本篇文章开始,剖析Spark做业的调度和计算体系。html

在说DAG以前,先简单说一下RDD。node

对RDD的总体归纳

文档说明以下:apache

RDD全称Resilient Distributed Dataset,即分布式弹性数据集。它是Spark的基本抽象,表明不可变的可分区的可并行计算的数据集。app

RDD的特色:框架

1. 包含了一系列的分区异步

2. 在每个split上执行函数计算分布式

3. 依赖于其余的RDDide

4. 对于key-value对的有partitioner函数

5. 每个计算有优先计算位置oop

更多内容能够去看Spark的论文:http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf

RDD的操做

RDD支持两种类型的操做:

  • transformation:它从已存在的数据集中建立一个新的数据集。它是懒执行的,即生成RDD的全部操做都是懒执行的,也就是说不会立刻计算出结果,它们只会记住它们依赖的基础数据集(文件、MQ等等),等到一个action须要结果返回到driver端的时候,才会执行transform的计算。这种设计使得RDD计算更加高效。
  • action:它在数据集上运行计算以后给driver端返回一个值。

注意:reduce 是一个action,而 reduceByKey 则是一个transform,由于它返回的是一个分布式数据集,并无把数据返回给driver节点。

Action函数

官方提供了RDD的action函数,以下:

注意:这只是常见的函数,并无列举全部的action函数。

Action函数的特色

那么action函数有哪些特色呢?

根据上面介绍的,即action会返回一个值给driver节点。即它们的函数返回值是一个具体的非RDD类型的值或Unit,而不是RDD类型的值。

Transformation函数

官方提供了Transform 函数,以下:

Transformation函数的特色

上文提到,transformation接收一个存在的数据集,并将计算结果做为新的RDD返回。也是就说,它的返回结果是RDD。

 

总结

其实,理解了action和transformation的特色,看函数的定义就知道是action仍是transformation。

 

RDD的依赖关系

官方文档里,聊完RDD的操做,紧接着就聊了一下shuffle,咱们按照这样的顺序来作一下说明。

Shuffle

官方给出的shuffle的解释以下:

注意:shuffle是特定操做才会发生的事情,这跟action和transformation划分没有关系。

官方给出了一些常见的例子。

Operations which can cause a shuffle include repartition operations like repartition and coalesceByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

RDD的四种依赖关系

那么shuffle跟什么有关系呢?

shuffle跟依赖有关系。在 spark 源码分析之一 -- RDD的四种依赖关系 中,说到 RDD 分为宽依赖和窄依赖,其中窄依赖有三种,一对一依赖、Range依赖、Prune 依赖。宽依赖只有一种,那就是 shuffle 依赖。

即RDD跟父RDD的依赖关系是宽依赖,那么就是父RDD在生成新的子RDD的过程当中是存在shuffle过程的。

如图:

这张图也说明了一个结论,并非全部的join都是宽依赖。

依赖关系在源码中的体现

咱们一般说的 RDD,在Spark中具体表现为一个抽象类,全部的RDD子类继承自该RDD,全称为 org.apache.spark.rdd.RDD,以下:

它有两个参数,一个参数是SparkContext,另外一个是deps,即Dependency集合,Dependency是全部依赖的公共父类,即deps保存了父类的依赖关系。

其中,窄依赖的父类是 NarrowDependency, 它的构造方法里是由父RDD这个参数的,宽依赖 ShuffleDependency ,它的构造方法里也是有父RDD这个参数的。

RDD 依赖关系的不肯定性

getDependencies 方法

获取抽象的方法是 getDependencies 方法,以下:

这只是定义在RDD抽象父类中的默认方法,不一样的子类会有不一样的实现。

它在以下类中又从新实现了这个方法,以下:

是不是shuffle依赖,跟分区的数量也有必定的关系,具体能够看下面的几个RDD的依赖的实现:

CoGroupedRDD

 

SubtractedRDD

DAG在Spark做业中的重要性

以下图,一个application的执行过程被划分为四个阶段:

阶段一:咱们编写driver程序,定义RDD的action和transformation操做。这些依赖关系造成操做的DAG。

阶段二:根据造成的DAG,DAGScheduler将其划分为不一样的stage。

阶段三:每个stage中有一个TaskSet,DAGScheduler将TaskSet交给TaskScheduler去执行,TaskScheduler将任务执行完毕以后结果返回给DAGSCheduler。

阶段四:TaskScheduler将任务分发到每个Worker节点去执行,并将结果返回给TaskScheduler。

 

本篇文章的定位就是阶段一和阶段二。后面会介绍阶段三和阶段四。

注:图片不知出处。

DAG的建立

咱们先来分析一个top N案例。

一个真实的TopN案例

需求:一个大文件里有不少的重复整数,如今求出重复次数最多的前10个数。

代码以下(为了多几个stage,特地加了几个repartition):

scala> val sourceRdd = sc.textFile("/tmp/hive/hive/result",10).repartition(5)
sourceRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at repartition at <console>:27

scala> val allTopNs = sourceRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).repartition(10).sortByKey(ascending = true, 100).map(tup => (tup._2, tup._1)).mapPartitions(
| iter => {
| iter.toList.sortBy(tup => tup._1).takeRight(100).iterator
| }
| ).collect()

// 结果略
scala> val finalTopN = scala.collection.SortedMap.empty[Int, String].++(allTopNs)
//结果略

scala> finalTopN.takeRight(10).foreach(tup => {println(tup._2 + " occurs times : " + tup._1)})

53 occurs times : 1070
147 occurs times : 1072
567 occurs times : 1073
931 occurs times : 1075
267 occurs times : 1077
768 occurs times : 1080
612 occurs times : 1081
877 occurs times : 1082
459 occurs times : 1084
514 occurs times : 1087

 

下面看一下生成的DAG和Stage 

任务概览

 Description描述的就是每个job的最后一个方法。

stage 0 到 3的DAG图:

stage 4 到 8的DAG图:

每个stage的Description描述的是stage的最后一个方法。

总结

能够看出,RDD的依赖关系是有driver端对RDD的操做造成的。

一个Stage中DAG的是根据RDD的依赖来构建的。

 

咱们来看一下源码。

Stage

定义

Stage是一组并行任务,它们都计算须要做为Spark做业的一部分运行的相同功能,其中全部任务具备相同的shuffle依赖。由调度程序运行的每一个DAG任务在发生shuffle的边界处被分红多个阶段,而后DAGScheduler以拓扑顺序运行这些阶段。每一个Stage均可以是一个shuffle map阶段,在这种状况下,其任务的结果是为其余阶段或结果阶段输入的,在这种状况下,其任务在RDD上运行函数直接计算Spark action(例如count(),save()等)。对于shuffle map阶段,咱们还跟踪每一个输出分区所在的节点。每一个stage还有一个firstJobId,用于识别首次提交stage的做业。使用FIFO调度时,这容许首先计算先前做业的阶段,或者在失败时更快地恢复。最后,因为故障恢复,能够在屡次尝试中从新执行单个stage。在这种状况下,Stage对象将跟踪多个StageInfo对象以传递给listener 或Web UI。最近的一个将经过latestInfo访问。

构造方法

Stage是一个抽象类,构造方法以下:

参数介绍以下:


id – Unique stage ID
rdd – RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks on, while for a result stage, it's the target RDD that we ran an action on
numTasks – Total number of tasks in stage; result stages in particular may not need to compute all partitions, e.g. for first(), lookup(), and take().
parents – List of stages that this stage depends on (through shuffle dependencies).
firstJobId – ID of the first job this stage was part of, for FIFO scheduling.
callSite – Location in the user program associated with this stage: either where the target RDD was created, for a shuffle map stage, or where the action for a result stage was called

callSite其实记录的就是stage用户代码的位置。

成员变量

成员方法

其实相对来讲比较简单。

Stage的子类

它有两个子类,以下:

ResultStage

类说明:

ResultStages apply a function on some partitions of an RDD to compute the result of an action. 
The ResultStage object captures the function to execute, func, which will be applied to each partition, and the set of partition IDs, partitions.
Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage在RDD的某些分区上应用函数来计算action操做的结果。 对于诸如first()和lookup()之类的操做,某些stage可能没法在RDD的全部分区上运行。

简言之,ResultStage是应用action操做在action上进而得出计算结果。

源码以下:

ShuffleMapStage

类说明

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle. 
They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter).
When executed, they save map output files that can later be fetched by reduce tasks.
The shuffleDep field describes the shuffle each stage is part of, and variables like outputLocs and numAvailableOutputs track how many map outputs are ready.
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
For such stages, the ActiveJobs that submitted them are tracked in mapStageJobs.
Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStage 是中间的stage,为shuffle生产数据。它们在shuffle以前出现。当执行完毕以后,结果数据被保存,以便reduce 任务能够获取到。

构造方法

 

shuffleDep记录了每个stage所属的shuffle。

Stage的划分

在上面咱们提到,每个RDD都有对父RDD的依赖关系,这样的依赖关系造成了一个有向无环图。即DAG。

当一个用户在一个RDD上运行一个action时,调度会检查RDD的血缘关系(即依赖关系)来建立一个stage中的DAG图来执行。

以下图:

在说stage划分以前先,剖析一下跟DAGScheduler相关的类。

EventLoop

类说明

An event loop to receive events from the caller and process all events in the event thread. It will start an exclusive event thread to process all events.

Note: The event queue will grow indefinitely. So subclasses should make sure onReceive can handle events in time to avoid the potential OOM.

它定义了异步消息处理机制框架。

消息队列

其内部有一个阻塞双端队列,用于存放消息:

post到消息队列

外部线程调用 post 方法将事件post到堵塞队列中:

消费线程

有一个消息的消费线程:

onReceive 方法是一个抽象方法,由子类来实现。

下面来看其实现类 -- DAGSchedulerEventProcessLoop。

其接收的是DAGSchedulerEvent类型的事件。DAGSchedulerEvent 是一个sealed trait,其实现以下:

它的每个子类事件,在doOnReceive 方法中都有体现,以下:

 

DAGScheduler

这个类的定义已经超过2k行了。因此也不打算所有介绍,本篇文章只介绍跟stage任务的生成相关的属性和方法。

类说明

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.

Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs

In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage. When looking through this code, there are several key concepts:

- Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.

- Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.

- Tasks are individual units of work, each sent to one machine.

- Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.

- Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.

- Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.

Here's a checklist to use when making or reviewing changes to this class:

- All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.

- When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

下面直接来看stage的划分

从源码看Stage的划分

从action函数到DAGScheduler

以collect函数为例。

collect 函数定义以下:

其调用了SparkContext的 runJob 方法,又调用了几回其重载方法最终调用的runJob 方法以下:

其内部调用了DAGScheduler的runJob 方法

DAGScheduler对stage的划分

DAGScheduler的runJob 方法以下:

 

思路,提交方法后返回一个JobWaiter 对象,等待任务执行完成,而后根据任务执行状态去执行对应的成功或失败的方法。

submitJob 以下:

最终任务被封装进了JobSubmitted 事件消息体中,最终该事件消息被放入了eventProcessLoop 对象中,eventProcessLoop定义以下:

即事件被放入到了上面咱们提到的 DAGSchedulerEventProcessLoop 异步消息处理模型中。

DAGSchedulerEventProcessLoop 的 doOnReceive 中,发现了 JobSubmitted 事件对应的分支为:

即会执行DAGScheduler的handleJobSubmitted方法,以下:

这个方法里面有两步:

  1. 建立ResultStage
  2. 提交Stage

本篇文章,咱们只分析第一步,第二步在下篇文章分析。

createResultStage 方法以下:

 getOrCreateParentStage 方法建立或获取该RDD的Shuffle依赖关系,而后根据shuffle依赖进而划分stage,源码以下:

获取其全部父类的shuffle依赖,getShuffleDependency 方法以下,相似于树的深度遍历。

getOrCreateShuffleMapStage方法根据shuffle依赖建立ShuffleMapStage,以下,思路,先查看当前stage是否已经记录在shuffleIdToMapStage变量中,若存在,表示已经建立过了,不然须要根据依赖的RDD去找其RDD的shuffle依赖,而后再建立shuffleMapStage。

shuffleIdToMapStage定义以下:

这个map中只包含正在运行的job的stage信息。

其中shuffle 依赖的惟一id 是:shuffleId,这个id 是 SpackContext 生成的全局shuffleId。

getMissingAncestorShuffleDependencies 方法以下,思路:深度遍历依赖关系,把全部未运行的shuffle依赖都找到。

 

到此,全部寻找shuffle依赖关系的的逻辑都已经剖析完毕,下面看建立MapShuffleStage的方法,

思路:生成ShuffleMapStage,并更新 stageIdToStage变量,更新shuffleIdToMapStage变量,若是 MapOutputTrackerMaster 中没有注册过该shuffle,须要注册,最后返回ShuffleMapStage对象。

updateJobIdStageIdMaps方法以下,思路该ResultStage依赖的全部ShuffleMapStage的jobId设定为指定的jobId,即跟ResultStage一致的jobId:

至此,stage的划分逻辑剖析完毕。

 

总结

 本篇文章对照官方文档,说明了RDD的主要操做,action和transformation,进一步引出了RDD的依赖关系,最后剖析了DAGScheduler根据shuffle依赖划分stage的逻辑。

 

注:文章中图片来源于 Spark 论文,论文地址:http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf