第32课:Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密-经管之家官网!

人大经济论坛-经管之家 收藏本站
您当前的位置> 考研考博>>

考研

>>

第32课:Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密

第32课:Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密

发布:无量天尊Spark | 分类:考研

关于本站

人大经济论坛-经管之家:分享大学、考研、论文、会计、留学、数据、经济学、金融学、管理学、统计学、博弈论、统计年鉴、行业分析包括等相关资源。
经管之家是国内活跃的在线教育咨询平台!

经管之家新媒体交易平台

提供"微信号、微博、抖音、快手、头条、小红书、百家号、企鹅号、UC号、一点资讯"等虚拟账号交易,真正实现买卖双方的共赢。【请点击这里访问】

提供微信号、微博、抖音、快手、头条、小红书、百家号、企鹅号、UC号、一点资讯等虚拟账号交易,真正实现买卖双方的共赢。【请点击这里访问】

先看一下Worker中Driver和Executor注册过程:Worker本身核心的作用是:管理当前机器的内存和CPU等资源,接受Master的指令来启动Driver,或者启动Executor。如何启动Driver如何启动Executor如果Driver或者Executor有挂 ...
免费学术公开课,扫码加入


先看一下Worker中Driver和Executor注册过程:


Worker本身核心的作用是:管理当前机器的内存和CPU等资源,接受Master的指令来启动Driver,或者启动Executor。

如何启动Driver

如何启动Executor

如果Driver或者Executor有挂掉了,则Master就可以通过schedule再次调度资源。

Worker本身在实际运行的时候作为一个进程。实现RPC通信的。

  1. extends ThreadSafeRpcEndpoint with Logging {
复制代码

Master通过RPC协议将消息发给Worker,Worker通过receive接收到了Master发过来的消息。

  1. case LaunchDriver(driverId, driverDesc) => {
  2. logInfo(s"Asked to launch driver $driverId")
  3. val driver = new DriverRunner(
  4. conf,
  5. driverId,
  6. workDir, //工作目录
  7. sparkHome,
  8. driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
  9. self,
  10. workerUri,
  11. securityMgr)
  12. drivers(driverId) = driver
  13. //启动DriverRunner
  14. driver.start()

  15. coresUsed += driverDesc.cores
  16. memoryUsed += driverDesc.mem
  17. }
复制代码

根据DriverId来具体管理DriverRunner。DriverRunner内部通过开辟线程的方式来启动了另外的一个线程。DriverRunner是Driver所在进程中Driver本身的Process。

  1. DriverId DriverRunner
  2. val drivers = new HashMap[String, DriverRunner]
复制代码

DriverRunner:

管理Driver的执行,包括在Driver失败的时候自动重启,主要是指在standaolone模式。Worker会负责重新启动Driver。Cluster中的Driver失败的时候,如果supervise为true,则启动Driver的Worker会负责重新启动该Driver。

  1. /**
  2. * Manages the execution of one driver, including automatically restarting the driver on failure.
  3. * This is currently only used in standalone cluster deploy mode.
  4. */
复制代码

创建Driver的工作目录:

  1. /** Starts a thread to run and manage the driver. */
  2. private[worker] def start() = {
  3. new Thread("DriverRunner for " + driverId) {
  4. override def run() {
  5. try {
  6. val driverDir = createWorkingDirectory()
复制代码

createWorkingDirectory(),创建Driver的工作目录

  1. /**
  2. * Creates the working directory for this driver.
  3. * Will throw an exception if there are errors preparing the directory.
  4. */
  5. private def createWorkingDirectory(): File = {
  6. //创建Driver的工作目录
  7. val driverDir = new File(workDir, driverId)
  8. if (!driverDir.exists() && !driverDir.mkdirs()) {
  9. throw new IOException("Failed to create directory " + driverDir)
  10. }
  11. driverDir
  12. }
复制代码

自己写的代码打成Jar包。

  1. val localJarFilename = downloadUserJar(driverDir)
复制代码

下载Jar文件,返回Jar在本地的路径,将程序打成JAR包上传到HDFS上,这样每台机器均可以从HDFS上下载。

  1. /**
  2. * Download the user jar into the supplied directory and return its local path.
  3. * Will throw an exception if there are errors downloading the jar.
  4. */
  5. private def downloadUserJar(driverDir: File): String = {
  6. //
  7. val jarPath = new Path(driverDesc.jarUrl)
  8. //从HDFS上获取Jar文件。
  9. val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
  10. val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
  11. val jarFileName = jarPath.getName
  12. val localJarFile = new File(driverDir, jarFileName)
  13. val localJarFilename = localJarFile.getAbsolutePath

  14. if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
  15. logInfo(s"Copying user jar $jarPath to $destPath")
  16. Utils.fetchFile(
  17. driverDesc.jarUrl,
  18. driverDir,
  19. conf,
  20. securityManager,
  21. hadoopConf,
  22. System.currentTimeMillis(),
  23. useCache = false)
  24. }

  25. if (!localJarFile.exists()) { // Verify copy succeeded
  26. throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
  27. }

  28. localJarFilename
  29. }
复制代码

有些变量在开始的时候是占位符,因为还没有初始化,所以在实际运行的时候要初始化。

  1. def substituteVariables(argument: String): String = argument match {
  2. case "{{WORKER_URL}}" => workerUrl
  3. case "{{USER_JAR}}" => localJarFilename //前面已经下载好了。
  4. case other => other
  5. }
复制代码

command主要就是构建进程执行类的入口。

  1. // TODO: If we add ability to submit multiple jars they should also be added here
  2. // driverDesc.command指定启动的时候运行什么类。
  3. val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
  4. driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
  5. //launchDriver
  6. launchDriver(builder, driverDir, driverDesc.supervise)
  7. }
复制代码

launchDriver的源码如下:将stdout和stderr重定向到了baseDir之下了,这样就可以通过log去查看之前的执行情况。

  1. private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
  2. builder.directory(baseDir)
  3. def initialize(process: Process): Unit = {
  4. // Redirect stdout and stderr to files
  5. val stdout = new File(baseDir, "stdout")
  6. CommandUtils.redirectStream(process.getInputStream, stdout)

  7. val stderr = new File(baseDir, "stderr")
  8. //将command格式化一下
  9. val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
  10. val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
  11. Files.append(header, stderr, UTF_8)
  12. CommandUtils.redirectStream(process.getErrorStream, stderr)
  13. }
  14. runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  15. }
复制代码

ProcessBuilderLike静态方法:

  1. private[deploy] object ProcessBuilderLike {
  2. //apply方法复写了start方法
  3. def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {
  4. override def start(): Process = processBuilder.start()
  5. override def command: Seq[String] = processBuilder.command().asScala
  6. }
  7. }
复制代码

ProcessBuilderLike源码如下:

  1. // Needed because ProcessBuilder is a final class and cannot be mocked
  2. private[deploy] trait ProcessBuilderLike {
  3. def start(): Process
  4. def command: Seq[String]
  5. }
复制代码

而在runCommandWithRetry方法中:

  1. //传入ProcessBuilderLike的接口
  2. def runCommandWithRetry(
  3. command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = {
  4. // Time to wait between submission retries.
  5. var waitSeconds = 1
  6. // A run of this many seconds resets the exponential back-off.
  7. val successfulRunDuration = 5

  8. var keepTrying = !killed

  9. while (keepTrying) {
  10. logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

  11. synchronized {
  12. if (killed) { return }
  13. //调用ProcessBuilderLike的start()方法
  14. process = Some(command.start())
  15. initialize(process.get)
  16. }

  17. val processStart = clock.getTimeMillis()
  18. //然后再调用process.get.waitFor()来完成启动Driver。
  19. val exitCode = process.get.waitFor()
  20. if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
  21. waitSeconds = 1
  22. }
  23. if (supervise && exitCode != 0 && !killed) {
  24. logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
  25. sleeper.sleep(waitSeconds)
  26. waitSeconds = waitSeconds * 2 // exponential back-off
  27. }

  28. keepTrying = supervise && exitCode != 0 && !killed
  29. finalExitCode = Some(exitCode)
  30. }
复制代码

最后,如果Driver的状态有变,则会给自己发条消息。

  1. worker.send(DriverStateChanged(driverId, state, finalException))
复制代码

Worker端:

  1. case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
  2. //处理Driver State Changed
  3. handleDriverStateChanged(driverStateChanged)
  4. }
复制代码

给Master发消息

  1. private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
  2. val driverId = driverStateChanged.driverId
  3. val exception = driverStateChanged.exception
  4. val state = driverStateChanged.state
  5. state match {
  6. case DriverState.ERROR =>
  7. logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
  8. case DriverState.FAILED =>
  9. logWarning(s"Driver $driverId exited with failure")
  10. case DriverState.FINISHED =>
  11. logInfo(s"Driver $driverId exited successfully")
  12. case DriverState.KILLED =>
  13. logInfo(s"Driver $driverId was killed by user")
  14. case _ =>
  15. logDebug(s"Driver $driverId changed state to $state")
  16. }
  17. //给master发送消息,告诉master,Driver状态发生变化了。
  18. sendToMaster(driverStateChanged)
复制代码

然后Masterreceive方法是负责接收Worker发消息的。根据Driver状态进行处理。

  1. case DriverStateChanged(driverId, state, exception) => {
  2. state match {
  3. case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
  4. removeDriver(driverId, state, exception)
  5. case _ =>
  6. throw new Exception(s"Received unexpected state update for driver $driverId: $state")
  7. }
  8. }
复制代码

removeDriver方法:从自己的数据结构中remove掉。

  1. private def removeDriver(
  2. driverId: String,
  3. finalState: DriverState,
  4. exception: Option[Exception]) {
  5. drivers.find(d => d.id == driverId) match {
  6. case Some(driver) =>
  7. logInfo(s"Removing driver: $driverId")
  8. drivers -= driver
  9. if (completedDrivers.size >= RETAINED_DRIVERS) {
  10. val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
  11. completedDrivers.trimStart(toRemove)
  12. }
  13. completedDrivers += driver
  14. //删除持久化引擎,例如Zookeeper持久化数据。
  15. persistenceEngine.removeDriver(driver)
  16. driver.state = finalState
  17. driver.exception = exception
  18. driver.worker.foreach(w => w.removeDriver(driver))
  19. //资源发生了变动,执行下schedule
  20. schedule()
  21. case None =>
  22. logWarning(s"Asked to remove unknown driver: $driverId")
  23. }
  24. }
复制代码

LaunchExecutor:先判断是否此时的路径是是activeMasterUrl

  1. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  2. if (masterUrl != activeMasterUrl) {
  3. logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
复制代码

创建Executor的工作目录

  1. // Create the executor's working directory
  2. val executorDir = new File(workDir, appId + "/" + execId)
复制代码

启动ExecutorRunner

  1. val manager = new ExecutorRunner(
  2. appId,
  3. execId,
  4. appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
  5. cores_,
  6. memory_,
  7. self,
  8. workerId,
  9. host,
  10. webUi.boundPort,
  11. publicAddress,
  12. sparkHome,
  13. executorDir,
  14. workerUri,
  15. conf,
  16. appLocalDirs, ExecutorState.RUNNING)
  17. executors(appId + "/" + execId) = manager
  18. manager.start()
复制代码

Start()方法通过fetchAndRunExecutor方法启动Executor。

  1. private[worker] def start() {
  2. workerThread = new Thread("ExecutorRunner for " + fullId) {
  3. override def run() { fetchAndRunExecutor() }
  4. }
复制代码

fetchAndRunExecutor源码如下:

  1. /**
  2. * Download and run the executor described in our ApplicationDescription
  3. */
  4. private def fetchAndRunExecutor() {
  5. try {
  6. // Launch the process
  7. val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
  8. memory, sparkHome.getAbsolutePath, substituteVariables)
  9. val command = builder.command()
  10. val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
  11. logInfo(s"Launch command: $formattedCommand")

  12. builder.directory(executorDir)
  13. builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
  14. // In case we are running this from within the Spark Shell, avoid creating a "scala"
  15. // parent process for the executor command
  16. builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

  17. // Add webUI log urls
  18. val baseUrl =
  19. s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
  20. builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
  21. builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

  22. process = builder.start()
  23. val header = "Spark Executor Command: %s\n%s\n\n".format(
  24. formattedCommand, "=" * 40)

  25. // Redirect its stdout and stderr to files
  26. val stdout = new File(executorDir, "stdout")
  27. stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

  28. val stderr = new File(executorDir, "stderr")
  29. Files.write(header, stderr, UTF_8)
  30. stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

  31. // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
  32. // or with nonzero exit code
  33. val exitCode = process.waitFor()
  34. state = ExecutorState.EXITED
  35. val message = "Command exited with code " + exitCode
  36. //executor状态改变的时候给Worker发消息。
  37. worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
复制代码然后Worker将消息发送给Master
  1. sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
复制代码
Master端处理的时候,还要给Driver发送消息
  1. case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
  2. val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
  3. execOption match {
  4. case Some(exec) => {
  5. val appInfo = idToApp(appId)
  6. val oldState = exec.state
  7. exec.state = state

  8. if (state == ExecutorState.RUNNING) {
  9. assert(oldState == ExecutorState.LAUNCHING,
  10. s"executor $execId state transfer from $oldState to RUNNING is illegal")
  11. appInfo.resetRetryCount()
  12. }
  13. //给Driver发送消息告诉Driver,Executor状态发生改变了。
  14. exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
复制代码

Worker原理内幕和流程控制如下图:



注:本学习笔记来自DT大数据梦工厂


「经管之家」APP:经管人学习、答疑、交友,就上经管之家!
免流量费下载资料----在经管之家app可以下载论坛上的所有资源,并且不额外收取下载高峰期的论坛币。
涵盖所有经管领域的优秀内容----覆盖经济、管理、金融投资、计量统计、数据分析、国贸、财会等专业的学习宝库,各类资料应有尽有。
来自五湖四海的经管达人----已经有上千万的经管人来到这里,你可以找到任何学科方向、有共同话题的朋友。
经管之家(原人大经济论坛),跨越高校的围墙,带你走进经管知识的新世界。
扫描下方二维码下载并注册APP
本文关键词:

本文论坛网址:https://bbs.pinggu.org/thread-4638174-1-1.html

人气文章

1.凡人大经济论坛-经管之家转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。