请选择 进入手机版 | 继续访问电脑版
1961 0

第11课:彻底解析wordcount运行原理 [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
22942 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

相似文件 换一批

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

1)在IntelliJ IDEA中编写下面代码:

  1. package com.dt.spark
  2. /**
  3. * 使用Java的方式开发进行本地测试Spark的WordCount程序
  4. * @author DT大数据梦工厂
  5. * http://weibo.com/ilovepains
  6. */
  7. import org.apache.spark.SparkConf
  8. import org.apache.spark.SparkContext
  9. object WordCount {
  10.   def main(args: Array[String]){


  11.     val conf = new SparkConf()
  12.     conf.setAppName("Wow, My First Spark App!")
  13.     conf.setMaster("local")
  14.     val sc = new SparkContext(conf)
  15.     val lines = sc.textFile("D://tmp//helloSpark.txt", 1)
  16.     val words = lines.flatMap { line => line.split(" ")}
  17.     val pairs = words.map { word => (word,1) }
  18.     val wordCounts = pairs.reduceByKey(_+_)
  19.     wordCounts.foreach(wordNumberPair => println(wordNumberPair._1+ " : " + wordNumberPair._2))
  20.     sc.stop()
  21.   }
  22. }
复制代码

2)在D盘下的tmp文件夹下新建helloSpark.txt文件,内容如下:

HelloSpark Hello Scala
Hello Hadoop
Hello Flink
Spark is awesome

3) 在WordCount代码区域点击右键选择Run'WordCount'。可以得到如下运行结果:

Flink :1
Spark : 2
is : 1
Hello : 4
awesome : 1
Hadoop : 1

Scala :1

下面从数据流动的视角分析数据到底是怎么被处理的。

ELBINNJNOY]HLE(U%08WRYU.png


Spark有三大特点:

1. 分布式。无论数据还是计算都是分布式的。默认分片策略:Block多大,分片就多大。但这种说法不完全准确,因为分片切分时有的记录可能跨两个Block,所以一个分片不会严格地等于Block的大小,例如HDFS的Block大小是128MB的话,分片可能多几个字节或少几个字节。一般情况下,分片都不会完全与Block大小相等。

分片不一定小于Block大小,因为如果最后一条记录跨两个Block的话,分片会把最后一条记录放在前一个分片中。

2. 基于内存(部分基于磁盘)

3. 迭代


textFile源码(SparkContext中);

  1. def textFile(
  2.     path: String,
  3.     minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  4.   assertNotStopped()
  5.   hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
  6.     minPartitions).map(pair => pair._2.toString)
  7. }
复制代码

可以看出在进行了hadoopFile之后又进行了map操作。

HadoopRDD从HDFS上读取分布式文件,并且以数据分片的方式存在于集群之中。

map的源码(RDD.scala中)

  1. def map[U: ClassTag](f: T =>U): RDD[U] = withScope {
  2.   val cleanF =sc.clean(f)
  3.   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  4. }
复制代码

读取到的一行数据(key,value的方式),对行的索引位置不感兴趣,只对其value事情兴趣。pair时有个匿名函数,是个tuple,取第二个元素。

此处又产生了MapPartitionsRDD。MapPartitionsRDD基于hadoopRDD产生的Parition去掉行的KEY。

注:可以看出一个操作可能产生一个RDD也可能产生多个RDD。如sc.textFile就产生了两个RDD:hadoopRDD和MapParititionsRDD。


下一步:val words = lines.flatMap { line => line.split("") }

对每个Partition中的每行进行单词切分,并合并成一个大的单词实例的集合。

FlatMap做的一件事就是对RDD中的每个Partition中的每一行的内容进行单词切分。

这边有4个Partition,对单词切分就变成了一个一个单词,

下面是FlatMap的源码(RDD.scala中)

  1. def flatMap[U: ClassTag](f: T =>TraversableOnce[U]): RDD[U] =withScope {
  2.   val cleanF =sc.clean(f)
  3.   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  4. }
复制代码

可以看出flatMap又产生了一个MapPartitionsRDD,

此时的各个Partition都是拆分后的单词。


下一步:    val pairs = words.map { word => (word,1)}

将每个单词实例变为形如word=>(word,1)

map操作就是把切分后的每个单词计数为1。

根据源码可知,map操作又会产生一个MapPartitonsRDD。此时的MapPartitionsRDD是把每个单词变成Array(""Hello",1),("Spark",1)等这样的形式。


下一步:val wordCounts = pairs.reduceByKey(_+_)

reduceByKey是进行全局单词计数统计,对相同的key的value相加,包括local和reducer同时进行reduce。所以在map之后,本地又进行了一次统计,即local级别的reduce。

shuffle前的Local Reduce操作,主要负责本地局部统计,并且把统计后的结果按照分区策略放到不同的File。

下一Stage就叫Reducer了,下一阶段假设有3个并行度的话,每个Partition进行Local Reduce后都会把数据分成三种类型。最简单的方式就是用HashCode对其取模。

至此都是stage1。

Stage内部完全基于内存迭代,不需要每次操作都有读写磁盘,所以速度非常快。

reduceByKey的源码:

  1. def reduceByKey(partitioner: Partitioner, func:(V, V) => V):RDD[(K, V)] = self.withScope {
  2.   combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  3. }
  4. /**
  5. * Merge the values for each key using anassociative reduce function. This will also perform
  6. * the merging locally on each mapper beforesending results to a reducer, similarly to a
  7. * "combiner" in MapReduce. Outputwill be hash-partitioned with numPartitions partitions.
  8. */
  9. def reduceByKey(func: (V, V) => V, numPartitions: Int):RDD[(K, V)] = self.withScope {
  10.   reduceByKey(new HashPartitioner(numPartitions), func)
  11. }
复制代码

可以看到reduceByKey内部有combineByKeyWithClassTag。combineByKeyWithClassTag的源码如下:

  1. def combineByKeyWithClassTag[C](
  2.     createCombiner: V =>C,
  3.     mergeValue: (C, V) => C,
  4.     mergeCombiners: (C, C) => C,
  5.     partitioner: Partitioner,
  6.     mapSideCombine: Boolean = true,
  7.     serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  8.   require(mergeCombiners != null, "mergeCombinersmust be defined") // required as of Spark 0.9.0
  9.   if (keyClass.isArray){
  10.     if (mapSideCombine){
  11.       throw new SparkException("Cannot usemap-side combining with array keys.")
  12.     }
  13.     if (partitioner.isInstanceOf[HashPartitioner]){
  14.       throw new SparkException("Defaultpartitioner cannot partition array keys.")
  15.     }
  16.   }
  17.   val aggregator = new Aggregator[K, V, C](
  18.     self.context.clean(createCombiner),
  19.     self.context.clean(mergeValue),
  20.     self.context.clean(mergeCombiners))
  21.   if (self.partitioner == Some(partitioner)) {
  22.     self.mapPartitions(iter => {
  23.       val context= TaskContext.get()
  24.       new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  25.     }, preservesPartitioning= true)
  26.   } else {
  27.     new ShuffledRDD[K, V, C](self, partitioner)
  28.       .setSerializer(serializer)
  29.       .setAggregator(aggregator)
  30.       .setMapSideCombine(mapSideCombine)
  31.   }
  32. }
复制代码

可以看出在combineByKeyWithClassTag内又new 了一个ShuffledRDD。

ReduceByKey有两个作用:

1. 进行Local级别的Reduce,减少网络传输。

2. 把当前阶段的内容放到本地磁盘上供shuffle使用。


下一步是shuffledRDD,产生Shuffle数据就需要进行分类,MapPartitionsRDD时其实已经分好类了,最简单的分类策略就是Hash分类。ShuffledRDD需要从每台机上抓取同一单词。

reduceByKey发生在哪里?

Stage2全部都是reduceByKey


最后一步:保存数据到HDFS(MapPartitionsRDD)

统计完的结果:(“Hello”,4)只是一个Value,而不是Key:"Hello",value:4。但输出到文件系统时需要KV的格式,现在只有Value,所以需要造个KEY。

saveAsTextFile的源码:

  1. def saveAsTextFile(path: String){
  2. this.map(x =>(NullWritable.get())),new Text(x.toStirng))
  3. .saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)
  4. }
复制代码

this.map把当前的值(x)变成tuple。tuple的Key是Null,Value是(“Hello”,4)。

为什么要为样?因为saveAsHadoopFile时要求以这样的格式输出。Hadoop需要KV的格式!!

map操作时把key舍去了,输出时就需要通过生成Key。

第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD

第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD


只有Collect 或saveAsTextFile会触发作业,其他的时候都没有触发作业(Lazy)



注:本学习笔记来自DT大数据梦工厂

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Count word Partitioning partition exception Spark scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 5 精彩帖子

总评分: 论坛币 + 5   查看全部评分

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-3-29 12:35