楼主: Lisrelchen
2786 20

【Rindra Ramamonjison】Apache Spark Graph Processing [推广有奖]

  • 0关注
  • 62粉丝

VIP

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
49957 个
通用积分
79.5487
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币


  1. Apache Spark Graph Processing
  2. By: Rindra Ramamonjison
  3. Publisher: Packt Publishing
  4. Pub. Date: September 10, 2015
  5. Print ISBN-13: 978-1-78439-180-5
  6. Web ISBN-13: 978-1-78439-895-8
  7. Pages in Print Edition: 148
  8. Subscriber Rating: 0 out of 5 rating [0 Ratings]
复制代码



二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Apache Spark Processing processI Process apache rating

本帖被以下文库推荐

沙发
Lisrelchen 发表于 2017-3-6 05:10:27 |只看作者 |坛友微信交流群
  1. Loading the data
  2. In this example, we will work with two CSV files people.csv and links.csv, which are contained in the directory $SPARKHOME/data/. Let's type the following commands to load these files into Spark:

  3. scala> val people = sc.textFile("./data/people.csv")
  4. people: org.apache.spark.rdd.RDD[String] = ./data/people.csv MappedRDD[81] at textFile at <console>:33

  5. scala> val links = sc.textFile("./data/links.csv")
  6. links: org.apache.spark.rdd.RDD[String] = ./data/links.csv MappedRDD[83] at textFile at <console>:33
复制代码

使用道具

藤椅
Lisrelchen 发表于 2017-3-6 05:13:56 |只看作者 |坛友微信交流群
Transforming RDDs to VertexRDD and EdgeRDD
  1. Going back to our example, let's construct the graph in three steps, as follows:

  2. 1.We define a case class Person, which has name and age as class parameters. Case classes are very useful when we need to do pattern matching on an object Person later on:
  3. case class Person(name: String, age: Int)
  4. 2.Next, we are going to parse each line of the CSV texts inside people and links into new objects of type Person and Edge respectively, and collect the results in RDD[(VertexId, Person)] and RDD[Edge[String]]:
  5. val peopleRDD: RDD[(VertexId, Person)] = people map { line =>
  6.   val row = line split ','
  7.   (row(0).toInt, Person(row(1), row(2).toInt))
  8. }
  9. scala> type Connection = String
  10. scala> val linksRDD: RDD[Edge[Connection]] = links map {line =>
  11.   val row = line split ','
  12.   Edge(row(0).toInt, row(1).toInt, row(2))
  13. }
  14. 3.Now, we can create our social graph and name it tinySocial using the factory method Graph(…):
  15. scala> val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD)
  16. tinySocial: org.apache.spark.graphx.Graph[Person,Connection] = org.apache.spark.graphx.impl.GraphImpl@128cd92a
复制代码

使用道具

板凳
Lisrelchen 发表于 2017-3-6 05:17:01 |只看作者 |坛友微信交流群
  1. The Graph factory method
  2. The first one is the Graph factory method that we have already seen in the previous chapter. It is defined in the apply method of the companion object called Graph, which is as follows:

  3. def apply[VD, ED](
  4.       vertices: RDD[(VertexId, VD)],
  5.       edges: RDD[Edge[ED]],
  6.       defaultVertexAttr: VD = null)
  7.     : Graph[VD, ED]
  8. As we have seen before, this function takes two RDD collections: RDD[(VertexId, VD)] and RDD[Edge[ED]] as parameters for the vertices and edges respectively, to construct a Graph[VD, ED] parameter. The defaultVertexAttr attribute is used to assign the default attribute for the vertices that are present in the edge RDD but not in the vertex RDD. The Graph factory method is convenient when the RDD collections of edges and vertices are readily available.
复制代码

使用道具

报纸
Lisrelchen 发表于 2017-3-6 05:17:59 |只看作者 |坛友微信交流群
  1. edgeListFile
  2. A more common situation is that your original dataset only represents the edges. In this case, GraphX provides the following GraphLoader.edgeListFile function that is defined in GraphLoader:

  3. def edgeListFile(
  4.       sc: SparkContext,
  5.       path: String,
  6.       canonicalOrientation: Boolean = false,
  7.       minEdgePartitions: Int = 1)
  8.       : Graph[Int, Int]
  9. It takes as an argument a path to the file that contains a list of edges. Each line of the file represents an edge of the graph with two integers in the form: sourceID destinationID. When reading the list, it ignores any comment line starting with #. Then, it constructs a graph from the specified edges with the corresponding vertices.

  10. The minEdgePartitions argument is the minimum number of edge partitions to generate. If the adjacency list is partitioned with more blocks than minEdgePartitions, then more partitions will be created.
复制代码

使用道具

地板
Lisrelchen 发表于 2017-3-6 05:18:24 |只看作者 |坛友微信交流群
  1. fromEdges
  2. Similar to GraphLoader.edgeListFile, the third function named Graph.fromEdges enables you to create a graph from an RDD[Edge[ED]] collection. Moreover, it automatically creates the vertices using the VertexID parameters specified by the edge RDD, as well as the defaultValue argument as a default vertex attribute:

  3. def fromEdges[VD, ED](
  4.       edges: RDD[Edge[ED]],
  5.       defaultValue: VD)
  6. : Graph[VD, ED]
复制代码

使用道具

7
Lisrelchen 发表于 2017-3-6 05:18:54 |只看作者 |坛友微信交流群
  1. fromEdgeTuples
  2. The last graph builder function is Graph.fromEdgeTuples, which creates a graph from only an RDD of edge tuples, that is, a collection of the RDD[(VertexId, VertexId)] type. It assigns the edges the attribute value 1 by default:

  3. def fromEdgeTuples[VD](
  4.       rawEdges: RDD[(VertexId, VertexId)],
  5.       defaultValue: VD,
  6.       uniqueEdges: Option[PartitionStrategy] = None)
  7. : Graph[VD, Int]
复制代码

使用道具

8
Lisrelchen 发表于 2017-3-6 05:22:22 |只看作者 |坛友微信交流群
Building directed graphs

  1. The first graph that we will build is the Enron email communication network. If you have restarted your Spark shell, you need to again import the GraphX library. First, create a new folder called data inside $SPARKHOME and copy the dataset into it. This file contains the adjacency list of the email communications between the employees. Assuming that the current directory is $SPARKHOME, we can pass the file path to the GraphLoader.edgeListFile method:
  2. scala> import org.apache.spark.graphx._
  3. scala> import org.apache.spark.rdd._
  4. scala> val emailGraph = GraphLoader.edgeListFile(sc, "./data/emailEnron.txt")
  5. scala> emailGraph.vertices.take(5)
  6. scala> emailGraph.edges.take(5)


  7. In GraphX, all the edges must be directed. To express non-directed or bidirectional graphs, we can link each connected pair in both directions. In our email network, we can verify for instance that the 19021 node has both incoming and outgoing links. First, we collect the destination nodes that node 19021 communicates to:
  8. scala> emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect()
  9. scala> emailGraph.edges.filter(_.dstId == 19021).map(_.srcId).collect()
复制代码

使用道具

9
Lisrelchen 发表于 2017-3-6 05:30:57 |只看作者 |坛友微信交流群

Building a bipartite graph

  1. ##Create the case classes named Ingredient and Compound, and use Scala's inheritance so that these two classes are the children of a FNNode class.

  2. scala> class FNNode(val name: String)

  3. scala> case class Ingredient(override val name: String, category: String) extends FNNode(name)

  4. scala> case class Compound(override val name: String, cas: String) extends FNNode(name)#data wrangling:
  5. val ingredients: RDD[(VertexId, FNNode)] =
  6. sc.textFile("./data/ingr_info.tsv").
  7.       filter(! _.startsWith("#")).
  8.       map {line =>
  9.              val row = line split '\t'
  10.              (row(0).toInt, Ingredient(row(1), row(2)))
  11.           }                           

  12. val compounds: RDD[(VertexId, FNNode)] =
  13. sc.textFile("./data/comp_info.tsv").
  14.       filter(! _.startsWith("#")).
  15.       map {line =>
  16.              val row = line split '\t'
  17.              (10000L + row(0).toInt, Compound(row(1), row(2)))
  18.           }                              

  19. ##Create an RDD[Edge[Int]] collection from the dataset named ingr_comp.tsv:

  20. val links: RDD[Edge[Int]] =
  21.   sc.textFile("./data/ingr_comp.tsv").
  22.      filter(! _.startsWith("#")).
  23.      map {line =>
  24.         val row = line split '\t'
  25.         Edge(row(0).toInt, 10000L + row(1).toInt, 1)
  26.      }

  27. ##Concatenate the two sets of nodes into a single RDD, and pass it to the Graph() factory method along with the RDD link:

  28. scala> val nodes = ingredients ++ compounds

  29. scala> val foodNetwork = Graph(nodes, links)

  30. scala> def showTriplet(t: EdgeTriplet[FNNode,Int]): String = "The ingredient " ++ t.srcAttr.name ++ " contains " ++ t.dstAttr.name
  31. showTriplet: (t: EdgeTriplet[FNNode,Int])String

  32. scala> foodNetwork.triplets.take(5).
  33.      foreach(showTriplet _ andThen println _)
复制代码


使用道具

10
Lisrelchen 发表于 2017-3-6 05:33:36 |只看作者 |坛友微信交流群

Building a weighted social ego network

  1. 1.Import the absolute value function and the SparseVector class from the Breeze library:

  2. import scala.math.abs
  3. import breeze.linalg.SparseVector
  4. 2.Define a type synonym called Feature for SparseVector[Int]:

  5. type Feature = breeze.linalg.SparseVector[Int]
  6. 3.Read the features inside the ego.feat file and collect them in a map whose keys and values are of the Long and Feature types, respectively:

  7. val featureMap: Map[Long, Feature] =
  8.   Source.fromFile("./data/ego.feat").
  9.      getLines().
  10.      map{line =>
  11.      val row = line split ' '
  12.      val key = abs(row.head.hashCode.toLong)
  13.      val feat = SparseVector(row.tail.map(_.toInt))
  14.      (key, feat)
  15.      }.toMap
  16. 4.Hash the string that corresponds to the node ID, as follows:

  17. val key = abs(row.head.hashCode.toLong)
  18. 5.Read the ego.edges file to create an RDD[Edge[Int]] collection of the links in the ego network.
  19. val edges: RDD[Edge[Int]] =
  20.   sc.textFile("./data/ego.edges").
  21.      map {line =>
  22.         val row = line split ' '
  23.         val srcId = abs(row(0).hashCode.toLong)
  24.         val dstId = abs(row(1).hashCode.toLong)
  25.         val srcFeat = featureMap(srcId)
  26.         val dstFeat = featureMap(dstId)
  27.         val numCommonFeats = srcFeat dot dstFeat
  28.         Edge(srcId, dstId, numCommonFeats)
  29.      }
复制代码

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-27 06:11