作者简介
谭政,Hulu 网大数据基础平台研发。曾在新浪微博平台工作过。专注于大数据存储和处理,对 Hadoop、HBase 以及 Spark 等等均有深入的了解。
Spark 最新的特性以及功能
2015 年中 Spark 版本从 1.2.1 升级到当前最新的 1.5.2,1.6.0 版本也马上要进行发布,每个版本都包含了许多的新特性以及重要的性能改进,我会按照时间顺序列举部分改进出来,希望大家对 Spark 版本的演化有一个稍微直观的认识。
由于篇幅关系,这次不能给大家一一讲解其中每一项改进,因此挑选了一些我认为比较重要的特性来给大家讲解。如有遗漏和错误,还请帮忙指正。
Spark 版本演化
首先还是先来看一下 Spark 对应版本的变化:
先来一个整体的介绍:1.2 版本主要集中于 Shuffle 优化, 1.3 版本主要的贡献是 DataFrame API, 1.4 版本引入 R API 并启动 Tungsten 项目阶段,1.5 版本完成了 Tungsten 项目的第一阶段,1.6 版本将会继续进行 Tungsten 项目的第二个阶段。而我下面则重点介绍 DataFrame API 以及 Tungsten 项目。
DataFrame 介绍
DataFrame API 是在 1.3.0 中引入的,其目的是为了统一 Spark 中对结构化数据的处理。在引入 DataFrame 之前,Spark 之有上针对结构化数据的 SQL 查询以及 Hive 查询。
这些查询的处理流程基本类似:查询串先需要经过解析器生成逻辑查询计划,然后经过优化器生成物理查询计划,最终被执行器调度执行。
而不同的查询引擎有不同的优化器和执行器实现,并且使用了不同的中间数据结构,这就导致很难将不同的引擎的优化合并到一起,新增一个查询语言也是非常艰难。
为了解决这个问题,Spark 对结构化数据表示进行了高层抽象,产生了 DataFrame API。简单来说 DataFrame 可以看做是带有 Schema 的 RDD(在1.3之前DataFrame 就叫做 SchemaRDD,受到 R 以及 Python 的启发改为 DataFrame这个名字)。
在 DataFrame 上可以应用一系列的表达式,最终生成一个树形的逻辑计划。这个逻辑计划将会经历 Analysis, Logical Optimization, Physical Planning 以及 Code Generation 阶段最终变成可执行的 RDD,如下图所示:
在上图中,除了最开始解析 SQL/HQL 查询串不一样之外,剩下的部分都是同一套执行流程,在这套流程上 Spark 实现了对上层 Spark SQL, Hive SQL, DataFrame 以及 R 语言的支持。
下面我们来看看这些语言的简单示例:
Spark SQL : val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)
各个语言的使用方式都很类似。除了类 SQL 的表达式操作之外,DataFrame 也提供普通的类似于 RDD 的转换,例如可以写如下代码:
另外还值得一提的是,和 DataFrame API 紧密相关的 API -- DataSource API。如果说 DataFrame API 提供的是对结构化数据的高层抽象,那么 DataSource API 提供的则是对于结构化数据统一的读写接口。
DataSource API 支持从 JSON, JDBC, ORC, parquet 中加载结构化数据 (SQLContext 类中的诸多读取方法,均会返回一个 DataFrame 对象),也同时支持将 DataFrame 的数据写入到上述数据源中 (DataFrame 中的 save 系列方法 )。
这两个 API 再加上层多种语言的支持,使得 Spark 对结构化数据拥有强大的处理能力,极大简化了用户编程工作。
Tungsten 项目介绍
在官方介绍中 Tungsten 将会是对 Spark 执行引擎所做的最大的修改,其主要目标是改进 Spark 内存和 CPU 的使用效率,尽可能发挥出机器硬件的最大性能。
之所以将优化的重点集中在内存和 CPU 而不是 IO 之上是社区实践发现现在很多的大数据应用的瓶颈在 CPU 。例如目前很多网络 IO 链路的速度达到 10Gbps,SSD 硬盘和 Striped HDD 阵列的使用也使得磁盘 IO 也有较大提升。而 CPU 的主频却没有多少提升,CPU 核数的增长也不如前两者迅速。
此外在 Spark 已经对 IO 做过很多的优化(如列存储以及 IO 剪枝可以减少 IO的数据量,优化的 Shuffle 改善了 IO 和网络的传输效率),再继续进行优化提升空间并不大。
而随着序列化以及 Hash 的广泛使用,现在 CPU 反而成为了一个瓶颈。
内存方面,使用 Java 原生的堆内存管理方式很容易产生 OOM 问题,并伴随着较大的 GC 负担,进一步降低了 CPU 的利用率。
基于上述观察 Spark 在 1.4 中启动了 Tungsten 项目,并在 1.5 中完成第一阶段的优化
这些优化包括下面三个方面:
内存管理和二进制格式处理
缓存友好的计算
代码生成
内存管理和二进制格式处理
避免以原生格式存储 Java 对象(使用二进制的存储格式),减少 GC 负担
压缩内存数据格式,减少内存占用以及可能的溢写。使用更准确的内存的统计而不是依赖启发规则管理内存。
对于那些已知数据格式运算( DataFrame 和 SQL ),直接使用二进制的运算,避免序列化和反序列化开销。
缓存友好的计算
更加快的排序以及 Hash,优化 Aggregation, Join 以及 Shuffle 操作。
代码生成
更快的表达式计算以及 DataFrame/SQL 运算(这是代码生成的主要应用场景,主要是为了降低进行表达式评估中 JVM 的各种开销,如虚函数调用,分支预测,原始类型的对象装箱开销以及内存消耗)更快的序列化。
相关的每个版本所做的优化如下:
Tungsten 项目并不是完全是一个通用的优化技术,其中很多优化利用了 DataFrame 模型所提供的丰富的语义信息(因此 DataFrame 和 Spark SQL 查询能够享受该项目所来的大量的好处),同样未来也会改进 RDD API 来为底层优化提供更多的信息支持。
Spark 在 Hulu 的实践
Hulu 是一家在线付费视频网站,每天都有大量的用户观看行为数据产生,这些数据会由 Hulu 的大数据平台进行存储以及处理。推荐团队需要从这些数据中挖掘出单个用户感兴趣的内容并推荐给对应的观众,广告团队需要根据用户的观看记录以及行为给其推荐的最合适广告,而数据团队则需要分析所有数据的各个维度并为公司的策略制定提供有效支持。
他们的所有工作都是在 Hulu 的大数据平台上完成的,该平台由 HDFS/Yarn, HBase, Hive, Cassandera 以及的 Presto,Spark 等组成。Spark 是运行在 Yarn上,由 Yarn 来管理资源并进行任务调度。
Spark 则主要有两类应用:Streaming 应用以及短时 Job。
Streaming 应用中各个设备前端将用户的行为日志输入到 Kafka 中,然后由 Spark Streaming 来进行处理,输出结果到 Cassandera, HBase 以及 HDFS 中。短时 Job 并不像 Streaming 应用一样一直运行,而是由用户或者定时脚本触发,一般运行时间从几分钟到十几个小时不等。
此外为了方便 PM 类型的用户更便捷的使用 Spark,我们也搭建了 Apache Zeppelin 这种交互式可视化执行环境。对于非 Python/Scala/Java/R 用户(例如某些用户想在 NodeJS 中提交 Spark 任务),我们也提供 REST 的 Spark-JobServer 来方便用户提交作业。
Hulu 从 0.9 版本就开始将 Spark 应用于线上作业,内部经历了 1.1.1, 1.2.0, 1.4.0 等诸多版本,目前内部使用的最新版本是基于社区 1.5.1 进行改造的。
在之前的版本中我们遇到的很多的问题也添加了不少新功能,大部分修改都已经包含在最新版本里面,我就不再这里赘述了。这节里我主要想讲的是社区里所没有的,但是我们认为还比较重要的一些修改。
较多的迭代触发 StackOverflow 的问题
在某些机器学习算法里面需要进行比较多轮的迭代,当迭代的次数超过一定次数时候应用程序就会发生 StackOverflow 而崩溃。这个次数限制并不会很大,几百次迭代就可能发生栈溢出。大家可以利用一小段代码来进行一个简单的测试:
产生上述错误的原因在于 Driver 将 RDD 任务发送给 Executor 执行的时候需要将 RDD 的信息序列化后广播到对应的 Executor 上。而 RDD 在序列的时候需要递归将其依赖的 RDD 序列化,这样在出现长 lineage 的 RDD 的时候就可能因为线程的栈帧内存不够,抛出 StackOverflow 异常。
解决方法也比较直接,就是将递归改为迭代,把原来需要递归保存在线程栈帧的序列化 RDD 挪到堆区进行保存。具体的做法是将 RDD 的依赖关系分离出来,变成两个映射表: rddId->List of depId 以及depId -> Dependency。Driver 端然后将 RDD 以及这些映射序列化为字节数组广播出去,Executor 端接收到广播消息后重新将映射组装成为原始的依赖。
这个过程中要改动 RDD 核心 Task 接口,需要经过严格的测试。但是在做这种优化之后,迭代个一两千次都没有什么问题。
Streaming 延迟数据接收机制( Receive-Base )
在 Receive-Base 的 Spark Streaming 的架构中, 主要有两个角色 Driver 和 Executor。
在 Executor 中运行着 Receiver, Receiver 的主要作用是从外部接收数据并缓存到本地内存中,同时 Receiver 回向 Driver 汇报自己所接收的数据块,Driver 定期产生新的任务并分发到各个 Executor 去处理这些数据。
在应用启动的时候,Driver 会首先将 Receiver 处理程序调度到各个 Executor 上让其初始化。一旦 Receiver 初始化完毕,它就开始源源不断的接收数据,并且需要 Driver 定期调度任务来消耗这些数据。
但是在某些场景下, Executor 处理端还并没有准备好,无法开始处理数据。
这时候在 Receiver 端就会发生内存积压,随着积压的数据越来越多,大部分数据都会撑过新生代回收年龄进入老年代,进一步给 GC 带来严重的压力,这个时候也就离应用程序崩溃不远了。
在 Hulu 的 Spark Streaming 处理中,需要加载并初始化很多机器学习的模型,这些模型的初始化非常费时间,长的可能需要半个小时才能初始化完毕。在此期间 Receiver 不能接收数据,否者内存将会被消耗殆尽。
Hulu 中的解决方法是在每个 Executor 接收任何任务之前先进行执行一个用户定义的初始化任务,初始化任务中可以执行一些独立的用户代码。我们在新增了一个接口,让用户可以设置自定义的初始化任务。
如下代码所示:
实现上需要更改 Spark 的任务调度器,先将每个 Executor 设置为未初始化状态,除了初始化任务之外调度器不会给未初始化状态的 Executor 分配其他任务。等 Executor 运行完初始化任务,调度器更新 Executor 的状态为已初始化,这样的 Executor 就可以分配给其他正常任务了,包括初始化 Receiver 的任务。
其他注意事项
Spark 允许用户设置 spark.executor.userClassPathFirst,这可以部分缓解用户代码库和 Spark 系统代码库冲突的问题。
但是在实践过程中我们发现,大并发情况加载相同的类有可能发生死锁的情况(我们的一个场景下有 1/10 几率复现该问题)。