1362 0

[大数据之Spark]——Actions算子操作入门实例 [推广有奖]

  • 0关注
  • 0粉丝

副教授

1%

还不是VIP/贵宾

-

威望
0
论坛币
2332 个
通用积分
9.3057
学术水平
4 点
热心指数
2 点
信用等级
1 点
经验
9398 点
帖子
1043
精华
0
在线时间
69 小时
注册时间
2014-8-19
最后登录
2018-6-19

相似文件 换一批

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

[大数据之Spark]——Actions算子操作入门实例


这个方法会传入两个参数,计算这两个参数返回一个结果。返回的结果与下一个参数一起当做参数继续进行计算。

比如,计算一个数组的和。


点击进入21世纪大数据人才汇聚领域』

点击进入21世纪数据分析精英聚集地』

点击进入21世纪大数据高薪就业领地』


112.jpg


//创建数据集scala> var data = sc.parallelize(1 to 3,1)scala> data.collectres6: Array[Int] = Array(1, 2, 3)//collect计算scala> data.reduce((x,y)=>x+y)res5: Int = 6collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.


返回数据集的所有元素,通常是在使用filter或者其他操作的时候,返回的数据量比较少时使用。

比如,显示刚刚定义的数据集内容。


//创建数据集scala> var data = sc.parallelize(1 to 3,1)scala> data.collectres6: Array[Int] = Array(1, 2, 3)count()

Return the number of elements in the dataset.


计算数据集的数据个数,一般都是统计内部元素的个数。


//创建数据集scala> var data = sc.parallelize(1 to 3,1)//统计个数scala> data.countres7: Long = 3scala> var data = sc.parallelize(List(("A",1),("B",1)))scala> data.countres8: Long = 2first()

Return the first element of the dataset (similar to take(1)).


返回数据集的第一个元素,类似take(1)


//创建数据集scala> var data = sc.parallelize(List(("A",1),("B",1)))//获取第一条元素scala> data.firstres9: (String, Int) = (A,1)take(n)

Return an array with the first n elements of the dataset.


返回数组的头n个元素


//创建数据集scala> var data = sc.parallelize(List(("A",1),("B",1)))scala> data.take(1)res10: Array[(String, Int)] = Array((A,1))//如果n大于总数,则会返回所有的数据scala> data.take(8)res12: Array[(String, Int)] = Array((A,1), (B,1))//如果n小于等于0,会返回空数组scala> data.take(-1)res13: Array[(String, Int)] = Array()scala> data.take(0)res14: Array[(String, Int)] = Array()takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.


这个方法与sample还是有一些不同的,主要表现在:

  • 返回具体个数的样本(第二个参数指定)
  • 直接返回array而不是RDD
  • 内部会将返回结果随机打散


//创建数据集
scala> var data = sc.parallelize(List(1,3,5,7))data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21//随机2个数据scala> data.takeSample(true,2,1)res0: Array[Int] = Array(7, 1)//随机4个数据,注意随机的数据可能是重复的scala> data.takeSample(true,4,1)res1: Array[Int] = Array(7, 7, 3, 7)//第一个参数是是否重复scala> data.takeSample(false,4,1)res2: Array[Int] = Array(3, 5, 7, 1)scala> data.takeSample(false,5,1)res3: Array[Int] = Array(3, 5, 7, 1)takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.


基于内置的排序规则或者自定义的排序规则排序,返回前n个元素


//创建数据集scala> var data = sc.parallelize(List("b","a","e","f","c"))data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21//返回排序数据scala> data.takeOrdered(3)res4: Array[String] = Array(a, b, c)saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.


将数据集作为文本文件保存到指定的文件系统、hdfs、或者hadoop支持的其他文件系统中。


//创建数据集scala> var data = sc.parallelize(List("b","a","e","f","c"))data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21//保存为test_data_save文件scala> data.saveAsTextFile("test_data_save")scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])<console>:24: error: not found: type GzipCodec              data.saveAsTextFile("test_data_save2",classOf[GzipCodec])                                                            ^//引入必要的classscala> import org.apache.hadoop.io.compress.GzipCodecimport org.apache.hadoop.io.compress.GzipCodec//保存为压缩文件scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])


查看文件


[xingoo@localhost bin]$ lldrwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_savedrwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2[xingoo@localhost bin]$ cd test_data_save2[xingoo@localhost test_data_save2]$ lltotal 4-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS[xingoo@localhost test_data_save2]$ cd ..[xingoo@localhost bin]$ cd test_data_save[xingoo@localhost test_data_save]$ lltotal 4-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS[xingoo@localhost test_data_save]$ cat part-00000 baefcsaveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).


保存为sequence文件


scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:22scala> data.saveAsSequenceFile("kv_test")[xingoo@localhost bin]$ cd kv_test/[xingoo@localhost kv_test]$ lltotal 12-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:25 _SUCCESSsaveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().


基于Java序列化保存文件


scala> var data = sc.parallelize(List("a","b","c"))data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:22scala> data.saveAsObjectFile("str_test")scala> var data2 = sc.objectFile[Array[String]]("str_test")data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at <console>:22scala> data2.collectcountByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.


统计KV中,相同K的V的个数


//创建数据集scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22//统计个数scala> data.countByKeyres9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.


Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.


针对每个参数执行,通常在更新互斥或者与外部存储系统交互的时候使用


// 创建数据集scala> var data = sc.parallelize(List("b","a","e","f","c"))data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22// 遍历scala> data.foreach(x=>println(x+" hello"))b helloa helloe hellof helloc hello


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Actions Action tions Spark SPAR 大数据 Spark

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-19 18:54