2633 0

第28课:Spark天堂之门解密 [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
22942 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、Spark天堂之门

1、Spark程序在执行的时候分为Driver和Executor两部分;

2、Spark程序的编写核心基础是RDD,具体包含两部分:

a)是由SparkContext来最初创建第一个RDD,一定是由SparkContext来创建的;

b)Spark程序的调度优化也是基于SparkContext。

3、Spark程序的注册要通过SparkContext实例化时候产生的对象来完成(其实是由SchedulerBackend注册程序,申请计算资源);

4、Spark程序运行的时候要通过ClusterManager获得具体的资源,计算资源的获取也是由SparkContext产生的对象来申请的(其实是 SchedulerBackend来获取资源的,SchedulerBackend是由SparkContext实例化的时候产生的,也就是说在构造SparkContext的时候产生);

从调度的层面讲调度优化也是基于SparkContext的,从程序注册的角度讲也是基于SparkContext,从程序获取计算资源的角度讲也是基于SparkContext获取计算资源,只要SparkContext关闭,程序也就结束运行。

5、SparkContext崩溃或者结束的整个Spark程序也结束啦!

总结:SparkContext开启天堂之门:Spark程序是通过SparkContext发布到Spark集群的;

          SparkContext导演天堂世界:SPark程序的运行都是在SparkContext为核心的调度器下指挥下进行的;

          SparkContext关闭天堂之门:SparkContext崩溃或者结束整个Spark程序结束了;


二、SparkContext内幕天堂揭秘

1、SparkContext构建的顶级三大核心对象:DAGScheduler、TaskScheduler、SchedulerBackend,其中:

a)SchedulerBackend是面向Stage的调度器;

b)TaskScheduler是一个接口,更加具体的Cluster Manager的不同会有不同的实现,Standalone模式下具体的实现是 TaskSchedulerlmpl;

c)SchedulerBackend是一个接口,更加具体的Cluster Manager的不同会有不同的实现,Standalone模式下具体的实现是SparkDeploySchedulerBackend;

2、从整个程序运行的角度来讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。

SparkContext默认构造器在SparkContext实例化的时候需要对,构造器中的参数赋值。

  1.    // Create and start the scheduler
  2.     val (sched, ts) = SparkContext.createTaskScheduler(this, master)
  3.     _schedulerBackend = sched
  4.     _taskScheduler = ts                    //task
  5.     _dagScheduler = new DAGScheduler(this)
  6.     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

  7.     // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  8.     // constructor
  9.     _taskScheduler.start()
复制代码

createTaskScheduler在SparkContext的默认构造器中,所以当实例化SparkContext的时候需要被调用,调用的时候返回了SchedulerBackend和_taskScheduler的实例,基于该内容有创建了DAGScheduler,DAGScheduler管理TaskScheduler。createTaskScheduler中采用了Local模式LOCAL_N_REGEX、Local多线程模式、Local多线程模式重试、Cluster模式、Yarn模式、Mesos模式,默认情况下作业失败了就失败了,不会去重试:

  1.   /**
  2.    * Create a task scheduler based on a given master URL.
  3.    * Return a 2-tuple of the scheduler backend and the task scheduler.
  4.     * 在SparkContext实例化的时候被调用
  5.    */
  6.   private def createTaskScheduler(
  7.       sc: SparkContext,
  8.       master: String): (SchedulerBackend, TaskScheduler) = {
  9.     import SparkMasterRegex._

  10.     // When running locally, don't try to re-execute tasks on failure.
  11.     val MAX_LOCAL_TASK_FAILURES = 1
  12.     下面为Standalone模式:

  13.       case SPARK_REGEX(sparkUrl) =>
  14.         val scheduler = new TaskSchedulerImpl(sc)
  15.         val masterUrls = sparkUrl.split(",").map("spark://" + _)
  16.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
  17.         scheduler.initialize(backend)
  18.         (backend, scheduler)
复制代码

从上述源码可以看出传进的sparkURL创建TaskSchedulerImpl对象,TaskSchedulerImpl是底层调度器的核心,在创建TaskSchedulerImpl后,以TaskSchedulerImpl为参数创建了一个SchedulerBackend对象,而SchedulerBackend进行initialize的时候将SchedulerBackend传入,具体传入的是SparkDeploySchedulerBackend,这里Scheduler的initialize方法如下:

  1.   //先进先出
  2.   def initialize(backend: SchedulerBackend) {
  3.     this.backend = backend
  4.     // temporarily set rootPool name to empty
  5.     rootPool = new Pool("", schedulingMode, 0, 0)
  6.     schedulableBuilder = {
  7.       schedulingMode match {
  8.         case SchedulingMode.FIFO =>  //先进先出的调度模式
  9.           new FIFOSchedulableBuilder(rootPool)
  10.         case SchedulingMode.FAIR =>
  11.           new FairSchedulableBuilder(rootPool, conf)
  12.       }
  13.     }
  14.     schedulableBuilder.buildPools()
  15.   }
复制代码

在Scheduler.initialize调用的时候会创建SchedulerPool调度池,一个任务有两种方式。

SparkDeploySchedulerBackend有三大核心功能:

1)负责与Master链接注册当前程序;

2)接收集群中为当前应用程序而分配计算资源Executor的注册并管理Executor;

3)负责发送Task到具体的Executor执行。

注:把Task发送给Executor是通过SchedulerBackend来完成的

补充说明的是:SparkDeploySchedulerBackend是被TaskSchedulerImpl来管理的。SparkContext的下一步进行TaskSchedulerImpl.start(),然后导致SparkDeploySchedulerBackend.start(),

  1. override def start() {
  2.     backend.start()

  3.     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
  4.       logInfo("Starting speculative execution thread")
  5.       speculationScheduler.scheduleAtFixedRate(new Runnable {
  6.         override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
  7.           checkSpeculatableTasks()
  8.         }
  9.       }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  10.     }
  11.   }
复制代码

start方法中主要是Backend.start(),实质是SchedulerBackend.start()。

  1. val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
复制代码

       当通过SparkDeploySchedulerBackend注册程序给Master的时候会把上述command提交给Master,Master发指令给Worker去启动Executor所在的进程的时候加载main(Executor进程所在的入口类)方法的所在的入口类就是command中的CoarseGrainedExecutorBackend,当然这里可以实现自己的ExecutorBackend,在CoarseGrainedExecutorBackend中启动Executor(Executor先注册),Executor通过线程池并发执行的方式执行Task。


      下面是CoaseGrainedExecutorBackend中的run()方法,其中有driverUrl、executorId、hostname、cores、APPId等参数:

  1. val executorConf = new SparkConf
  2.       val port = executorConf.getInt("spark.executor.port", 0)  //获得executor的port
  3.       val fetcher = RpcEnv.create(
  4.         "driverPropsFetcher",
  5.         hostname,
  6.         port,
  7.         executorConf,
  8.         new SecurityManager(executorConf),  //传入SparkConf的实例
  9.         clientMode = true)
  10.       val driver = fetcher.setupEndpointRefByURI(driverUrl)
  11.       val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
  12.         Seq[(String, String)](("spark.app.id", appId))
  13.       fetcher.shutdown()
复制代码

下面代码中使用从driver上发送的属性创建了SparkEnv,然后以键值对的方式循环遍历从driver传入的参数,通过SparkConf设置该属性(Standalone模式),然后通过SparkEnv调用createExecutorEnv,主要实现了对executor的port设置,创建了CoaseGrainedExecutorBackend(new 出该main方法的对象实例)的对象进行RPC通信。

  1. val env = SparkEnv.createExecutorEnv(driverConf, executorId, hostname, port, cores, isLocal = false)

  2.       // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
  3.       // connections (e.g., if it's using akka). Otherwise, the executor is running in
  4.       // client mode only, and does not accept incoming connections.
  5.       val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
  6.           hostname + ":" + port
  7.         }.orNull
  8.       env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
  9.         env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
复制代码

启动CoaseGrainedExecutorBackend的时候需要注册Executor,当收到driver级别的Executor注册信息后,才会实例化Executor的对象,只有当Executor注册成功的时候才会实例化。

  1. override def receive: PartialFunction[Any, Unit] = {
  2.     case RegisteredExecutor(hostname) =>
  3.       logInfo("Successfully registered with driver")
  4.       executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
复制代码

当TaskSchedulerImpl调用start方法后导致SparkDeploySchedulerBackend调用start方法,而Executor要向SparkDeploySchedulerBackend注册,在SparkDeploySchedulerBackend的start方法中实例化了APPClient对象:

  1. val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
  2. //每个Executor中使用多少个cores可以在Spark.env中配置
  3.     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
  4.       command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
  5.     client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  6.     client.start()
  7.     launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  8.     waitForRegistration()
  9.     launcherBackend.setState(SparkAppHandle.State.RUNNING)
复制代码

上面new除了AppClient的实例对象,AppClient是一个接口,可以和集群进行通信,其参数包含了masterURL和应用程序信息(因为要注册应用程序给集群),并且是集群时间的监听器。

  1. /**
  2. * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
  3. * an app description, and a listener for cluster events, and calls back the listener when various
  4. * events occur.
  5. *
  6. * @param masterUrls Each url should look like spark://host:port.
  7. */
  8. private[spark] class AppClient(
  9.     rpcEnv: RpcEnv,
  10.     masterUrls: Array[String],
  11.     appDescription: ApplicationDescription,
  12.     listener: AppClientListener,
  13.     conf: SparkConf)
  14.   extends Logging {
复制代码

在AppClient中有很重要的类ClientEndpoint,在ClientEndpoint实例化启动的start中主要是registerWithMaster:

  1. override def onStart(): Unit = {
  2.       try {
  3.         registerWithMaster(1)
  4.       } catch {
  5.         case e: Exception =>
  6.           logWarning("Failed to connect to master", e)
  7.           markDisconnected()
  8.           stop()
  9.       }
  10.     }
复制代码

在registerWithMaster中进行注册:

  1. private def registerWithMaster(nthRetry: Int) {
  2.       registerMasterFutures.set(tryRegisterAllMasters())
  3.       registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
复制代码

其内部使用tryRegisterAllMasters进行注册:

  1. /**
  2.      *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
  3.      */
  4.     private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  5.       for (masterAddress <- masterRpcAddresses) yield {
  6.         registerMasterThreadPool.submit(new Runnable {
复制代码

tryRegisterAllMasters注册Application的时候是通过Thread完成的。面继续跟进代码:

  1. masterRef.send(RegisterApplication(appDescription, self))
复制代码

进行跟进查看注册Application的内容:

  1. private[spark] case class ApplicationDescription(
  2.     name: String,
  3.     maxCores: Option[Int],
  4.     memoryPerExecutorMB: Int,
  5.     command: Command,
  6.     appUiUrl: String,   //在web控制台显示的Application信息
  7.     eventLogDir: Option[URI] = None,
  8.     // short name of compression codec used when writing event logs, if any (e.g. lzf)
  9.     eventLogCodec: Option[String] = None,
  10.     coresPerExecutor: Option[Int] = None,
  11.     user: String = System.getProperty("user.name", "<unknown>")) {
复制代码

      上面我们看到了tryRegisterAllMasters中是通过Thread的方式向Master注册的,那么我们在Master中可以看到发送的具体注册Application的信息。我们在Master的receive方法中看到了通过模式匹配的方式包括了RegisterApplication,接受注册,分配资源。


三、总结

上面我们通过源码的方式详细剖析了提交应用程序后,通过driver向Master注册Application的过程,当注册完成Application后Master发送指令给Worker启动Executor,并向driver中的SparkDeploySchedulerBackend注册Executor的信息。

SparkContext中还包含了TaskScheduler面向Stage的高层调度器,将action触发的job中的rdd划分为若干个Stage。

SparkContext中也包括SparkUI,背后是Jetty服务,支持通过web的方式访问程序的状态。


注:本学习笔记来自DT大数据梦工厂        微信公众号:DT_Spark        每晚8点YY永久直播频道:68917580

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark Park SPAR SPA registration Spark scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 5 精彩帖子

总评分: 论坛币 + 5   查看全部评分

本帖被以下文库推荐

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-23 20:30