4086 4

第32课:Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密 [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
23032 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

楼主
无量天尊Spark 发表于 2016-6-2 16:49:49 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

先看一下Worker中Driver和Executor注册过程:


Worker本身核心的作用是:管理当前机器的内存和CPU等资源,接受Master的指令来启动Driver,或者启动Executor。

如何启动Driver

如何启动Executor

如果Driver或者Executor有挂掉了,则Master就可以通过schedule再次调度资源。

Worker本身在实际运行的时候作为一个进程。实现RPC通信的。

  1. extends ThreadSafeRpcEndpoint with Logging {
复制代码

Master通过RPC协议将消息发给Worker,Worker通过receive接收到了Master发过来的消息。

  1. case LaunchDriver(driverId, driverDesc) => {
  2.   logInfo(s"Asked to launch driver $driverId")
  3.   val driver = new DriverRunner(
  4.     conf,
  5.     driverId,
  6.     workDir, //工作目录
  7.     sparkHome,
  8.     driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
  9.     self,
  10.     workerUri,
  11.     securityMgr)
  12.   drivers(driverId) = driver
  13. //启动DriverRunner
  14.   driver.start()

  15.   coresUsed += driverDesc.cores
  16.   memoryUsed += driverDesc.mem
  17. }
复制代码

根据DriverId来具体管理DriverRunner。DriverRunner内部通过开辟线程的方式来启动了另外的一个线程。DriverRunner是Driver所在进程中Driver本身的Process。

  1. DriverId DriverRunner
  2. val drivers = new HashMap[String, DriverRunner]
复制代码

DriverRunner:

管理Driver的执行,包括在Driver失败的时候自动重启,主要是指在standaolone模式。Worker会负责重新启动Driver。Cluster中的Driver失败的时候,如果supervise为true,则启动Driver的Worker会负责重新启动该Driver。

  1. /**
  2. * Manages the execution of one driver, including automatically restarting the driver on failure.
  3. * This is currently only used in standalone cluster deploy mode.
  4. */
复制代码

创建Driver的工作目录:

  1. /** Starts a thread to run and manage the driver. */
  2. private[worker] def start() = {
  3.   new Thread("DriverRunner for " + driverId) {
  4.     override def run() {
  5.       try {
  6.         val driverDir = createWorkingDirectory()
复制代码

createWorkingDirectory(),创建Driver的工作目录

  1. /**
  2.   * Creates the working directory for this driver.
  3.   * Will throw an exception if there are errors preparing the directory.
  4.   */
  5. private def createWorkingDirectory(): File = {
  6. //创建Driver的工作目录
  7.   val driverDir = new File(workDir, driverId)
  8.   if (!driverDir.exists() && !driverDir.mkdirs()) {
  9.     throw new IOException("Failed to create directory " + driverDir)
  10.   }
  11.   driverDir
  12. }
复制代码

自己写的代码打成Jar包。

  1. val localJarFilename = downloadUserJar(driverDir)
复制代码

下载Jar文件,返回Jar在本地的路径,将程序打成JAR包上传到HDFS上,这样每台机器均可以从HDFS上下载。

  1. /**
  2. * Download the user jar into the supplied directory and return its local path.
  3. * Will throw an exception if there are errors downloading the jar.
  4. */
  5. private def downloadUserJar(driverDir: File): String = {
  6. //
  7.   val jarPath = new Path(driverDesc.jarUrl)
  8. //从HDFS上获取Jar文件。
  9.   val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
  10.   val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
  11.   val jarFileName = jarPath.getName
  12.   val localJarFile = new File(driverDir, jarFileName)
  13.   val localJarFilename = localJarFile.getAbsolutePath

  14.   if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
  15.     logInfo(s"Copying user jar $jarPath to $destPath")
  16.     Utils.fetchFile(
  17.       driverDesc.jarUrl,
  18.       driverDir,
  19.       conf,
  20.       securityManager,
  21.       hadoopConf,
  22.       System.currentTimeMillis(),
  23.       useCache = false)
  24.   }

  25.   if (!localJarFile.exists()) { // Verify copy succeeded
  26.     throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
  27.   }

  28.   localJarFilename
  29. }
复制代码

有些变量在开始的时候是占位符,因为还没有初始化,所以在实际运行的时候要初始化。

  1. def substituteVariables(argument: String): String = argument match {
  2.   case "{{WORKER_URL}}" => workerUrl        
  3.   case "{{USER_JAR}}" => localJarFilename //前面已经下载好了。
  4.   case other => other
  5. }
复制代码

command主要就是构建进程执行类的入口。

  1. // TODO: If we add ability to submit multiple jars they should also be added here
  2. // driverDesc.command指定启动的时候运行什么类。
  3.   val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
  4.     driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
  5. //launchDriver
  6.   launchDriver(builder, driverDir, driverDesc.supervise)
  7. }
复制代码

launchDriver的源码如下:将stdout和stderr重定向到了baseDir之下了,这样就可以通过log去查看之前的执行情况。

  1. private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
  2.   builder.directory(baseDir)
  3.   def initialize(process: Process): Unit = {
  4.     // Redirect stdout and stderr to files
  5.     val stdout = new File(baseDir, "stdout")
  6.     CommandUtils.redirectStream(process.getInputStream, stdout)

  7.     val stderr = new File(baseDir, "stderr")
  8.         //将command格式化一下
  9.     val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
  10.     val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
  11.     Files.append(header, stderr, UTF_8)
  12.     CommandUtils.redirectStream(process.getErrorStream, stderr)
  13.   }
  14.   runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  15. }
复制代码

ProcessBuilderLike静态方法:

  1. private[deploy] object ProcessBuilderLike {
  2. //apply方法复写了start方法
  3.   def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {
  4.     override def start(): Process = processBuilder.start()
  5.     override def command: Seq[String] = processBuilder.command().asScala
  6.   }
  7. }
复制代码

ProcessBuilderLike源码如下:

  1. // Needed because ProcessBuilder is a final class and cannot be mocked
  2. private[deploy] trait ProcessBuilderLike {
  3.   def start(): Process
  4.   def command: Seq[String]
  5. }
复制代码

而在runCommandWithRetry方法中:

  1. //传入ProcessBuilderLike的接口
  2. def runCommandWithRetry(
  3.     command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = {
  4.   // Time to wait between submission retries.
  5.   var waitSeconds = 1
  6.   // A run of this many seconds resets the exponential back-off.
  7.   val successfulRunDuration = 5

  8.   var keepTrying = !killed

  9.   while (keepTrying) {
  10.     logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

  11.     synchronized {
  12.       if (killed) { return }
  13.         //调用ProcessBuilderLike的start()方法
  14.       process = Some(command.start())
  15.       initialize(process.get)
  16.     }

  17.     val processStart = clock.getTimeMillis()
  18. //然后再调用process.get.waitFor()来完成启动Driver。
  19.     val exitCode = process.get.waitFor()
  20.     if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
  21.       waitSeconds = 1
  22.     }
  23.   if (supervise && exitCode != 0 && !killed) {
  24.     logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
  25.     sleeper.sleep(waitSeconds)
  26.     waitSeconds = waitSeconds * 2 // exponential back-off
  27.   }

  28.   keepTrying = supervise && exitCode != 0 && !killed
  29.   finalExitCode = Some(exitCode)
  30. }
复制代码

最后,如果Driver的状态有变,则会给自己发条消息。

  1. worker.send(DriverStateChanged(driverId, state, finalException))
复制代码

Worker端:

  1. case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
  2. //处理Driver State Changed
  3.   handleDriverStateChanged(driverStateChanged)
  4. }
复制代码

给Master发消息

  1. private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
  2.   val driverId = driverStateChanged.driverId
  3.   val exception = driverStateChanged.exception
  4.   val state = driverStateChanged.state
  5.   state match {
  6.     case DriverState.ERROR =>
  7.       logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
  8.     case DriverState.FAILED =>
  9.       logWarning(s"Driver $driverId exited with failure")
  10.     case DriverState.FINISHED =>
  11.       logInfo(s"Driver $driverId exited successfully")
  12.     case DriverState.KILLED =>
  13.       logInfo(s"Driver $driverId was killed by user")
  14.     case _ =>
  15.       logDebug(s"Driver $driverId changed state to $state")
  16.   }
  17. //给master发送消息,告诉master,Driver状态发生变化了。
  18.   sendToMaster(driverStateChanged)
复制代码

然后Masterreceive方法是负责接收Worker发消息的。根据Driver状态进行处理。

  1. case DriverStateChanged(driverId, state, exception) => {
  2.   state match {
  3.     case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
  4.       removeDriver(driverId, state, exception)
  5.     case _ =>
  6.       throw new Exception(s"Received unexpected state update for driver $driverId: $state")
  7.   }
  8. }
复制代码

removeDriver方法:从自己的数据结构中remove掉。

  1. private def removeDriver(
  2.     driverId: String,
  3.     finalState: DriverState,
  4.     exception: Option[Exception]) {
  5.   drivers.find(d => d.id == driverId) match {
  6.     case Some(driver) =>
  7.       logInfo(s"Removing driver: $driverId")
  8.       drivers -= driver
  9.       if (completedDrivers.size >= RETAINED_DRIVERS) {
  10.         val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
  11.         completedDrivers.trimStart(toRemove)
  12.       }
  13.       completedDrivers += driver
  14.         //删除持久化引擎,例如Zookeeper持久化数据。
  15.       persistenceEngine.removeDriver(driver)
  16.       driver.state = finalState
  17.       driver.exception = exception
  18.       driver.worker.foreach(w => w.removeDriver(driver))
  19.         //资源发生了变动,执行下schedule
  20.       schedule()
  21.     case None =>
  22.       logWarning(s"Asked to remove unknown driver: $driverId")
  23.   }
  24. }
复制代码

LaunchExecutor:先判断是否此时的路径是是activeMasterUrl

  1. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  2.   if (masterUrl != activeMasterUrl) {
  3.     logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
复制代码

创建Executor的工作目录

  1. // Create the executor's working directory
  2. val executorDir = new File(workDir, appId + "/" + execId)
复制代码

启动ExecutorRunner

  1. val manager = new ExecutorRunner(
  2.   appId,
  3.   execId,
  4.   appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
  5.   cores_,
  6.   memory_,
  7.   self,
  8.   workerId,
  9.   host,
  10.   webUi.boundPort,
  11.   publicAddress,
  12.   sparkHome,
  13.   executorDir,
  14.   workerUri,
  15.   conf,
  16.   appLocalDirs, ExecutorState.RUNNING)
  17. executors(appId + "/" + execId) = manager
  18. manager.start()
复制代码

Start()方法通过fetchAndRunExecutor方法启动Executor。

  1. private[worker] def start() {
  2.   workerThread = new Thread("ExecutorRunner for " + fullId) {
  3.     override def run() { fetchAndRunExecutor() }
  4.   }
复制代码

fetchAndRunExecutor源码如下:

  1. /**
  2. * Download and run the executor described in our ApplicationDescription
  3. */
  4. private def fetchAndRunExecutor() {
  5.   try {
  6.     // Launch the process
  7.     val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
  8.       memory, sparkHome.getAbsolutePath, substituteVariables)
  9.     val command = builder.command()
  10.     val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
  11.     logInfo(s"Launch command: $formattedCommand")

  12.     builder.directory(executorDir)
  13.     builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
  14.     // In case we are running this from within the Spark Shell, avoid creating a "scala"
  15.     // parent process for the executor command
  16.     builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

  17.     // Add webUI log urls
  18.     val baseUrl =
  19.       s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
  20.     builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
  21.     builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

  22.     process = builder.start()
  23.     val header = "Spark Executor Command: %s\n%s\n\n".format(
  24.       formattedCommand, "=" * 40)

  25.     // Redirect its stdout and stderr to files
  26.     val stdout = new File(executorDir, "stdout")
  27.     stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

  28.     val stderr = new File(executorDir, "stderr")
  29.     Files.write(header, stderr, UTF_8)
  30.     stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

  31.     // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
  32.     // or with nonzero exit code
  33.     val exitCode = process.waitFor()
  34.     state = ExecutorState.EXITED
  35.     val message = "Command exited with code " + exitCode
  36. //executor状态改变的时候给Worker发消息。
  37.     worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
复制代码
然后Worker将消息发送给Master
  1. sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
复制代码

Master端处理的时候,还要给Driver发送消息
  1. case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
  2.   val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
  3.   execOption match {
  4.     case Some(exec) => {
  5.       val appInfo = idToApp(appId)
  6.       val oldState = exec.state
  7.       exec.state = state

  8.       if (state == ExecutorState.RUNNING) {
  9.         assert(oldState == ExecutorState.LAUNCHING,
  10.           s"executor $execId state transfer from $oldState to RUNNING is illegal")
  11.         appInfo.resetRetryCount()
  12.       }
  13. //给Driver发送消息告诉Driver,Executor状态发生改变了。
  14.       exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
复制代码

Worker原理内幕和流程控制如下图:


{%81VK0T6]RZSB69%H3J[22.png




注:本学习笔记来自DT大数据梦工厂


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:worker driver drive Spark Work Spark scala DT_Spark 大数据

已有 1 人评分经验 论坛币 收起 理由
daazx + 20 + 10 精彩帖子

总评分: 经验 + 20  论坛币 + 10   查看全部评分

本帖被以下文库推荐

沙发
yuan161 发表于 2016-6-17 11:55:29
代码是亮点

藤椅
无量天尊Spark 发表于 2016-6-17 13:12:00
yuan161 发表于 2016-6-17 11:55
代码是亮点

板凳
无量天尊Spark 发表于 2016-6-18 17:27:18
yuan161 发表于 2016-6-17 11:55
代码是亮点
你的回复也是亮点

报纸
无量天尊Spark 发表于 2016-6-18 17:28:07
yuan161 发表于 2016-6-17 11:55
代码是亮点
你的回复也是亮点

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2026-1-2 03:25