3377 0

第31课:Spark资源调度分配内幕天机彻底解密 [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
23032 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

楼主
无量天尊Spark 发表于 2016-6-2 15:59:13 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、任务调度与资源调度的区别

1、任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度;

2、资源调度是指应用程序如何获得资源;

3、任务调度是在资源调度的基础上进行的,没有资源调度那么任务调度就成为了无源之水无本之木!


二、资源调度内幕天机解密

1)因为Master负责资源管理和调度,所以资源调度的方法shedule位于Master.scala这个类中,当注册程序或者资源发生改变的时候都会导致schedule的调用,例如注册程序的时候:

  1. case RegisterApplication(description, driver) => {
  2.   // TODO Prevent repeated registrations from some driver
  3.   if (state == RecoveryState.STANDBY) {
  4.     // ignore, don't send response
  5.   } else {
  6.     logInfo("Registering app " + description.name)
  7.     val app = createApplication(description, driver)
  8.     registerApplication(app)
  9.     logInfo("Registered app " + description.name + " with ID " + app.id)
  10.     persistenceEngine.addApplication(app)
  11.     driver.send(RegisteredApplication(app.id, self))
  12.     schedule()
  13.   }
  14. }
复制代码

2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等);

3)当前Master必须是Alive的方式采用进行资源的调度,如果不是ALIVE的状态会直接返回,也就是Standby Master不会进行Application的资源调用!

  1. if (state != RecoveryState.ALIVE) { return }
复制代码

4)使用Random.shuffle把Master中保留的集群中所有Worker的信息随机打乱;

  1. val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
复制代码

其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置:

  1. def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
  2.   val buf = new ArrayBuffer[T] ++= xs

  3.   def swap(i1: Int, i2: Int) {
  4.     val tmp = buf(i1)
  5.     buf(i1) = buf(i2)
  6.     buf(i2) = tmp
  7.   }

  8.   for (n <- buf.length to 2 by -1) {
  9.     val k = nextInt(n)
  10.     swap(n - 1, k)
  11.   }

  12.   (bf(xs) ++= buf).result
  13. }
复制代码

5)接下来要判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作:

  1. for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
复制代码

6)当SparkSubmit指定Driver在Cluster模式的情况下,此时Driver会加入waitingDrivers等待列表中,在每个DriverInfo的DriverDescription中有要启动Driver时候对Worker的内存及Cores的要求等内容才能launch driver,如果内存和cores没有,worker不会launch driver:如果是client模式,不会有等待提交driver,因为application提交driver就启动了。下面参数中如果有supervise,则driver挂掉后可以自动重启,前提是driver是在集群中的,重启次数好像是5次。

  1. private[deploy] case class DriverDescription(
  2.     jarUrl: String,
  3.     mem: Int,
  4.     cores: Int,
  5.     supervise: Boolean,
  6.     command: Command) {

  7.   override def toString: String = s"DriverDescription (${command.mainClass})"
  8. }
复制代码

在符合资源要求的情况下然后采用随机打乱后的一个Worker来启动Driver:

  1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  2.   logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  3.   worker.addDriver(driver)
  4.   driver.worker = Some(worker)
  5.   worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  6.   driver.state = DriverState.RUNNING
  7. }
复制代码

Master发指令给Worker,让远程的Worker启动Driver:

  1. worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
复制代码

7) 先启动Driver才会发生后续的一切的资源调度的模式;

8)Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础上才能够满足下一个应用程序资源的分配;

9)为应用程序具体分配Executor之前要判断应用程序是否还需要分配Core,如果不需要则不会为应用程序分配Executor;

10)具体分配Executor之前要对要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序产生计算资源由大到小的usableWorkers数据结构:

  1. val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  2.   .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
  3.     worker.coresFree >= coresPerExecutor.getOrElse(1))
  4.   .sortBy(_.coresFree).reverse
复制代码

在FIFO的情况下默认是spreadOutApps来让应用程序尽可能多的运行在所有的Node上:

  1. private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
复制代码

11)为应用程序分配Executors有两种方式,第一种方式是尽可能在集群的所有Worker上分配Executor,因为这样是更好的响应并发处理能力的,更好的利用机器的并发资源,这种方式往往会带来潜在的更好的数据本地性;

12)具体在集群上分配Cores的时候会尽可能的满足我们的要求,所以下面求了一个最小值:

  1. var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
复制代码

13)如果是每个Worker下面只能够为当前的应用程序分配一个Executor的话,每次是分配一个Core!

  1. // If we are launching one executor per worker, then every iteration assigns 1 core
  2. // to the executor. Otherwise, every iteration assigns cores to a new executor.
  3. if (oneExecutorPerWorker) {
  4.   assignedExecutors(pos) = 1
  5. } else {
  6.   assignedExecutors(pos) += 1
  7. }
复制代码

14)准备具体要为当前应用程序分配的Executor信息后,Master要通过远程通信发指令给Worker来具体启动ExecutorBackend进程:

  1. worker.endpoint.send(LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
复制代码

15)紧接着给我们应用程序的Driver发送一个ExecutorAdded的信息:

  1. exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
复制代码


注:本学习笔记来自DT大数据梦工厂


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark SPAR Park SPA registration Spark scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 5 精彩帖子

总评分: 论坛币 + 5   查看全部评分

本帖被以下文库推荐

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2026-1-3 19:42