楼主: 我的素质低
4557 0

[Hadoop] 〖摘·spark〗安装 Scala 与Spark(二) [推广有奖]

学术权威

83%

还不是VIP/贵宾

-

TA的文库  其他...

〖素质文库〗

结构方程模型

考研资料库

威望
8
论坛币
23388 个
通用积分
28302.3504
学术水平
2705 点
热心指数
2881 点
信用等级
2398 点
经验
223623 点
帖子
2977
精华
52
在线时间
2175 小时
注册时间
2012-11-24
最后登录
2024-1-13

一级伯乐勋章 初级学术勋章 初级热心勋章 初级信用勋章 中级热心勋章 中级学术勋章 中级信用勋章 高级学术勋章 高级热心勋章 高级信用勋章 特级学术勋章

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
安装 Scala 和 Spark



       第一步是下载和配置 Scala。清单 4 中显示的命令阐述了 Scala 安装的下载和准备工作。使用 Scala v2.8,因为这是经过证实的 Spark 所需的版本。





清单 4. 安装 Scala

  1. $ wget http://www.scala-lang.org/downlo ... ala-2.8.1.final.tgz $ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/
复制代码

       要使 Scala 可视化,请将下列行添加至您的 .bashrc 中(如果您正使用 Bash 作为 shell):

  1. export SCALA_HOME=/opt/scala-2.8.1.finalexport PATH=$SCALA_HOME/bin:$PATH
复制代码



      接着可以对您的安装进行测试,如 清单 5 所示。这组命令会将更改加载至 bashrc 文件中,接着快速测试 Scala 解释器 shell。






清单 5. 配置和运行交互式 Scala

  1. $ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
  2. Type in expressions to have them evaluated.
  3. Type :help for more information.

  4. scala> println("Scala is installed!")
  5. Scala is installed!scala> :quit $
复制代码



      如清单中所示,现在应该看到一个 Scala 提示。您可以通过输入:quit执行退出。注意,Scala 要在 JVM 的上下文中执行操作,所以您会需要 JVM。我使用的是 Ubuntu,它在默认情况下会提供 OpenJDK。


接下来,请获取最新的 Spark 框架副本。为此,请使用 清单 6 中的脚本。






清单 6. 下载和安装 Spark 框架


  1. wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/
  2. mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
  3. $ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
复制代码


接下来,使用下列行将 spark 配置设置在 Scala 的根目录 ./conf/spar-env.sh 中:



  1. export SCALA_HOME=/opt/scala-2.8.1.final
复制代码



       设置的最后一步是使用简单的构建工具 (sbt) 更新您的分布。sbt是一款针对 Scala 的构建工具,用于 Spark 分布中。您可以在 mesos-spark-c86af80 子目录中执行更新和变异步骤,如下所示:


  1. $ sbt/sbt update compile
复制代码



      注意,在执行此步骤时,需要连接至 Internet。当完成此操作后,请执行 Spark 快速检测,如 清单 7 所示。在该测试中,需要运行 SparkPi 示例,它会计算 pi 的估值(通过单位平方中的任意点采样)。所显示的格式需要样例程序 (spark.examples.SparkPi) 和主机参数,该参数定义了 Mesos 主机(在此例中,是您的本地主机,因为它是一个单节点集群)和要使用的线程数量。注意,在 清单 7 中,执行了两个任务,而且这两个任务被序列化(任务 0 开始和结束之后,任务 1 再开始)。






清单 7. 对 Spark 执行快速检测
  1. $ ./run spark.examples.SparkPi local[1] 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501
  2. 11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
  3. 11/08/26 19:52:33 INFO spark.SparkContext: Starting job...
  4. 11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
  5. 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
  6. 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations
  7. 11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0
  8. 11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()
  9. 11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()
  10. 11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
  11. 11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0
  12. 11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
  13. 11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0
  14. 11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1
  15. 11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
  16. 11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
  17. 11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1
  18. 11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
  19. 11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s
  20. Pi is roughly 3.14952
  21. $
复制代码


通过增加线程数量,您不仅可以增加线程执行的并行化,还可以用更少的时间执行作业(如 清单 8 所示)。






清单 8. 对包含两个线程的 Spark 执行另一个快速检测





  1. $ ./run spark.examples.SparkPi local[2] 11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
  2. 11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501
  3. 11/08/26 20:04:30 INFO spark.SparkContext: Starting job...
  4. 11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache
  5. 11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
  6. 11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations
  7. 11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0
  8. 11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()
  9. 11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()
  10. 11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
  11. 11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0
  12. 11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1
  13. 11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
  14. 11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
  15. 11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0
  16. 11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1
  17. 11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
  18. 11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
  19. 11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s
  20. Pi is roughly 3.14052
  21. $
复制代码







使用 Scala 构建一个简单的 Spark 应用程序





      要构建 Spark 应用程序,您需要单一 Java 归档 (JAR) 文件形式的 Spark 及其依赖关系。使用sbt在 Spark 的顶级目录中创建该 JAR 文件,如下所示:

  1. $ sbt/sbt assembly
复制代码



      结果产生一个文件 ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar"。将该文件添加至您的 CLASSPATH 中,以便可以访问它。在本示例中,不会用到此 JAR 文件,因为您将会使用 Scala 解释器运行它,而不是对其进行编译。



      在本示例中,使用了标准的 MapReduce 转换(如 清单 9 所示)。该示例从执行必要的 Spark 类导入开始。接着,需要定义您的类 (SparkTest) 及其主方法,用它解析稍后使用的参数。这些参数定义了执行 Spark 的环境(在本例中,该环境是一个单节点集群)。接下来,要创建SparkContext对象,它会告知 Spark 如何对您的集群进行访问。该对象需要两个参数:Mesos 主机名称(已传入)以及您分配给作业的名称 (SparkTest)。解析命令行中的切片数量,它会告知 Spark 用于作业的线程数量。要设置的最后一项是指定用于 MapReduce 操作的文本文件。



     最后,您将了解 Spark 示例的实质,它是由一组转换组成。使用您的文件时,可调用flatMap方法返回一个 RDD(通过指定的函数将文本行分解为标记)。然后通过map方法(该方法创建了键值对)传递此 RDD ,最终通过ReduceByKey方法合并键值对。合并操作是通过将键值对传递给_ + _匿名函数来完成的。该函数只采用两个参数(密钥和值),并返回将两者合并所产生的结果(一个String和一个Int)。接着以文本文件的形式发送该值(到输出目录)。





清单 9. Scala/Spark 中的 MapReduce (SparkTest.scala)



  1. import spark.SparkContext
  2. import SparkContext._

  3. object SparkTest {

  4.   def main( args: Array[String]) {

  5.     if (args.length == 0) {
  6.       System.err.println("Usage: SparkTest <host> [<slices>]")
  7.       System.exit(1)
  8.     }

  9.     val spark = new SparkContext(args(0), "SparkTest")
  10.     val slices = if (args.length > 1) args(1).toInt else 2

  11.     val myFile = spark.textFile("test.txt")
  12.     val counts = myFile.flatMap(line => line.split(" "))
  13.                         .map(word => (word, 1))
  14.                         .reduceByKey(_ + _)

  15.     counts.saveAsTextFile("out.txt")

  16.   }

  17. }

  18. SparkTest.main(args)
复制代码


要执行您的脚本,只需要执行以下命令:




  1. $ scala SparkTest.scala local[1]
复制代码



      您可以在输出目录中找到 MapReduce 测试文件(如 output/part-00000)。


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark SCALA Park SPAR SPA 安装 Scala Spark

已有 2 人评分经验 论坛币 学术水平 热心指数 信用等级 收起 理由
niuniuyiwan + 100 + 100 + 5 + 5 + 5 精彩帖子
daazx + 4 精彩帖子

总评分: 经验 + 100  论坛币 + 100  学术水平 + 9  热心指数 + 5  信用等级 + 5   查看全部评分

本帖被以下文库推荐

心晴的时候,雨也是晴;心雨的时候,晴也是雨!
扣扣:407117636,欢迎一块儿吐槽!!
您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-28 19:49