楼主: 我的素质低
1592 0

〖移花接木〗Spark技术内幕:Stage划分及提交源码分析 [推广有奖]

已卖:2774份资源

学术权威

83%

还不是VIP/贵宾

-

TA的文库  其他...

〖素质文库〗

结构方程模型

考研资料库

威望
8
论坛币
23391 个
通用积分
28308.7907
学术水平
2705 点
热心指数
2881 点
信用等级
2398 点
经验
229176 点
帖子
2968
精华
52
在线时间
2175 小时
注册时间
2012-11-24
最后登录
2024-1-13

一级伯乐勋章 初级学术勋章 初级热心勋章 初级信用勋章 中级热心勋章 中级学术勋章 中级信用勋章 高级学术勋章 高级热心勋章 高级信用勋章 特级学术勋章

楼主
我的素质低 学生认证  发表于 2015-3-26 22:57:39 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

当触发一个RDD的action后,以count为例,调用关系如下:


    1. org.apache.spark.rdd.RDD#count
    2. org.apache.spark.SparkContext#runJob
    3. org.apache.spark.scheduler.DAGScheduler#runJob
    4. org.apache.spark.scheduler.DAGScheduler#submitJob
    5. org.apache.spark.scheduler.DAGSchedulerEventProcessActor#receive(JobSubmitted)
    6. org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted
    复制代码

其中步骤五的DAGSchedulerEventProcessActor是DAGScheduler 的与外部交互的接口代理,DAGScheduler在创建时会创建名字为eventProcessActor的actor。这个actor的作用看它的实现就一目了然了:



    1. /**
    2. * The main event loop of the DAG scheduler.
    3. */
    4. def receive = {
    5.   case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
    6.     dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
    7.       listener, properties) // 提交job,来自与RDD->SparkContext->DAGScheduler的消息。之所以在这需要在这里中转一下,是为了模块功能的一致性。

    8.   case StageCancelled(stageId) => // 消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上显示一个SparkContext的Job的执行状态。
    9.     // 用户可以cancel一个Stage,会通过SparkContext->DAGScheduler 传递到这里。
    10.     dagScheduler.handleStageCancellation(stageId)

    11.   case JobCancelled(jobId) => // 来自于org.apache.spark.scheduler.JobWaiter的消息。取消一个Job
    12.     dagScheduler.handleJobCancellation(jobId)

    13.   case JobGroupCancelled(groupId) => // 取消整个Job Group
    14.     dagScheduler.handleJobGroupCancelled(groupId)

    15.   case AllJobsCancelled => //取消所有Job
    16.     dagScheduler.doCancelAllJobs()

    17.   case ExecutorAdded(execId, host) => // TaskScheduler得到一个Executor被添加的消息。具体来自org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers
    18.     dagScheduler.handleExecutorAdded(execId, host)

    19.   case ExecutorLost(execId) => //来自TaskScheduler
    20.     dagScheduler.handleExecutorLost(execId)

    21.   case BeginEvent(task, taskInfo) => // 来自TaskScheduler
    22.     dagScheduler.handleBeginEvent(task, taskInfo)

    23.   case GettingResultEvent(taskInfo) => //处理获得TaskResult信息的消息
    24.     dagScheduler.handleGetTaskResult(taskInfo)

    25.   case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => //来自TaskScheduler,报告task是完成或者失败
    26.     dagScheduler.handleTaskCompletion(completion)

    27.   case TaskSetFailed(taskSet, reason) => //来自TaskScheduler,要么TaskSet失败次数超过阈值或者由于Job Cancel。
    28.     dagScheduler.handleTaskSetFailed(taskSet, reason)

    29.   case ResubmitFailedStages => //当一个Stage处理失败时,重试。来自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion
    30.     dagScheduler.resubmitFailedStages()
    31. }
    复制代码




总结一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将DAGScheduler的比较复杂功能接口化。



handleJobSubmitted



    org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted首先会根据RDD创建finalStage。

finalStage,顾名思义,就是最后的那个Stage。然后创建job,最后提交。提交的job如果满足一下条件,那么它将以本地模式运行:


     1)spark.localExecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalStage的没有父Stage 并且 4)仅有一个partition


     3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。


      要理解什么是Stage,首先要搞明白什么是Task。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的多个patition会分别由不同的Task去处理。当然了这些Task的处理逻辑完全是一致的。这一组Task就组成了一个Stage。有两种Task:



  • org.apache.spark.scheduler.ShuffleMapTask
  • org.apache.spark.scheduler.ResultTask


      ShuffleMapTask根据Task的partitioner将计算结果放到不同的bucket中。而ResultTask将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组ResultTask。



      在用户触发了一个action后,比如count,collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的event processor 传递到DAGScheduler本身的handleJobSubmitted,它首先会划分Stage,提交Stage,提交Task。至此,Task就开始在运行在集群上了。


     一个Stage的开始就是从外部存储或者shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。


创建finalStage



handleJobSubmitted 通过调用newStage来创建finalStage:



  • finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)



创建一个result stage,或者说finalStage,是通过调用org.apache.spark.scheduler.DAGScheduler#newStage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。

  1. private def newStage(
  2.       rdd: RDD[_],
  3.       numTasks: Int,
  4.       shuffleDep: Option[ShuffleDependency[_, _, _]],
  5.       jobId: Int,
  6.       callSite: CallSite)
  7.     : Stage =
  8.   {
  9.     val id = nextStageId.getAndIncrement()
  10.     val stage =
  11.       new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
  12.     stageIdToStage(id) = stage
  13.     updateJobIdStageIdMaps(jobId, stage)
  14.     stage
  15.   }
复制代码




对于result 的final stage来说,传入的shuffleDep是None。

我们知道,RDD通过org.apache.spark.rdd.RDD#getDependencies可以获得它依赖的parent RDD。而Stage也可能会有parent Stage。看一个RDD论文的Stage划分吧:

      一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是Job的结果(result task对应的stage)或者shuffle的结果。


      上图的话stage3的输入则是RDD A和RDD F shuffle的结果。而A和F由于到B和G需要shuffle,因此需要划分到不同的stage。


从源码实现的角度来看,通过触发action也就是最后一个RDD创建final stage(上图的stage 3),我们注意到new Stage的第五个参数就是该Stage的parent Stage:通过rdd和job id获取:




    1. // 生成rdd的parent Stage。没遇到一个ShuffleDependency,就会生成一个Stage
    2.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    3.     val parents = new HashSet[Stage] //存储parent stage
    4.     val visited = new HashSet[RDD[_]] //存储已经被访问到得RDD
    5.     // We are manually maintaining a stack here to prevent StackOverflowError
    6.     // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
    7.     val waitingForVisit = new Stack[RDD[_]]
    8.     def visit(r: RDD[_]) {
    9.       if (!visited(r)) {
    10.         visited += r
    11.         // Kind of ugly: need to register RDDs with the cache here since
    12.         // we can’t do it in its constructor because # of partitions is unknown
    13.         for (dep <- r.dependencies) {
    14.           dep match {
    15.             case shufDep: ShuffleDependency[_, _, _] => // 在ShuffleDependency时需要生成新的stage
    16.               parents += getShuffleMapStage(shufDep, jobId)
    17.             case _ =>
    18.               waitingForVisit.push(dep.rdd) //不是ShuffleDependency,那么就属于同一个Stage
    19.           }
    20.         }
    21.       }
    22.     }
    23.     waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
    24.     while (!waitingForVisit.isEmpty) { //只要stack不为空,则一直处理。
    25.       visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
    26.     }
    27.     parents.toList
    28.   }
    复制代码




生成了finalStage后,就需要提交Stage了。



    1. // 提交Stage,如果有parent Stage没有提交,那么递归提交它。
    2. private def submitStage(stage: Stage) {
    3.   val jobId = activeJobForStage(stage)
    4.   if (jobId.isDefined) {
    5.     logDebug(“submitStage(“ + stage + “)”)
    6.     // 如果当前stage不在等待其parent stage的返回,并且 不在运行的状态, 并且 没有已经失败(失败会有重试机制,不会通过这里再次提交)
    7.     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    8.       val missing = getMissingParentStages(stage).sortBy(_.id)
    9.       logDebug(“missing: “ + missing)
    10.       if (missing == Nil) { // 如果所有的parent stage都已经完成,那么提交该stage所包含的task
    11.         logInfo(“Submitting “ + stage + ” (“ + stage.rdd + “), which has no missing parents”)
    12.         submitMissingTasks(stage, jobId.get)
    13.       } else {
    14.         for (parent <- missing) { // 有parent stage为完成,则递归提交它
    15.           submitStage(parent)
    16.         }
    17.         waitingStages += stage
    18.       }
    19.     }
    20.   } else {
    21.     abortStage(stage, “No active job for stage “ + stage.id)
    22.   }
    23. }
    复制代码



DAGScheduler将Stage划分完成后,提交实际上是通过把Stage转换为TaskSet,然后通过TaskScheduler将计算任务最终提交到集群。其所在的位置如下图所示。

接下来,将分析Stage是如何转换为TaskSet,并最终提交到Executor去运行的。

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark stage 移花接木 Park SPAR 技术

已有 1 人评分论坛币 学术水平 收起 理由
daazx + 10 + 3 精彩帖子

总评分: 论坛币 + 10  学术水平 + 3   查看全部评分

本帖被以下文库推荐

心晴的时候,雨也是晴;心雨的时候,晴也是雨!
扣扣:407117636,欢迎一块儿吐槽!!

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2026-1-28 04:34