In this example, we will work with two CSV files people.csv and links.csv, which are contained in the directory $SPARKHOME/data/. Let's type the following commands to load these files into Spark:
scala> val people = sc.textFile("./data/people.csv")
people: org.apache.spark.rdd.RDD[String] = ./data/people.csv MappedRDD[81] at textFile at <console>:33
scala> val links = sc.textFile("./data/links.csv")
links: org.apache.spark.rdd.RDD[String] = ./data/links.csv MappedRDD[83] at textFile at <console>:33
Going back to our example, let's construct the graph in three steps, as follows:
1.We define a case class Person, which has name and age as class parameters. Case classes are very useful when we need to do pattern matching on an object Person later on:
case class Person(name: String, age: Int)
2.Next, we are going to parse each line of the CSV texts inside people and links into new objects of type Person and Edge respectively, and collect the results in RDD[(VertexId, Person)] and RDD[Edge[String]]:
val peopleRDD: RDD[(VertexId, Person)] = people map { line =>
val row = line split ','
(row(0).toInt, Person(row(1), row(2).toInt))
}
scala> type Connection = String
scala> val linksRDD: RDD[Edge[Connection]] = links map {line =>
val row = line split ','
Edge(row(0).toInt, row(1).toInt, row(2))
}
3.Now, we can create our social graph and name it tinySocial using the factory method Graph(…):
scala> val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD)
The first one is the Graph factory method that we have already seen in the previous chapter. It is defined in the apply method of the companion object called Graph, which is as follows:
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
As we have seen before, this function takes two RDD collections: RDD[(VertexId, VD)] and RDD[Edge[ED]] as parameters for the vertices and edges respectively, to construct a Graph[VD, ED] parameter. The defaultVertexAttr attribute is used to assign the default attribute for the vertices that are present in the edge RDD but not in the vertex RDD. The Graph factory method is convenient when the RDD collections of edges and vertices are readily available.
A more common situation is that your original dataset only represents the edges. In this case, GraphX provides the following GraphLoader.edgeListFile function that is defined in GraphLoader:
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
It takes as an argument a path to the file that contains a list of edges. Each line of the file represents an edge of the graph with two integers in the form: sourceID destinationID. When reading the list, it ignores any comment line starting with #. Then, it constructs a graph from the specified edges with the corresponding vertices.
The minEdgePartitions argument is the minimum number of edge partitions to generate. If the adjacency list is partitioned with more blocks than minEdgePartitions, then more partitions will be created.
Similar to GraphLoader.edgeListFile, the third function named Graph.fromEdges enables you to create a graph from an RDD[Edge[ED]] collection. Moreover, it automatically creates the vertices using the VertexID parameters specified by the edge RDD, as well as the defaultValue argument as a default vertex attribute:
The last graph builder function is Graph.fromEdgeTuples, which creates a graph from only an RDD of edge tuples, that is, a collection of the RDD[(VertexId, VertexId)] type. It assigns the edges the attribute value 1 by default:
The first graph that we will build is the Enron email communication network. If you have restarted your Spark shell, you need to again import the GraphX library. First, create a new folder called data inside $SPARKHOME and copy the dataset into it. This file contains the adjacency list of the email communications between the employees. Assuming that the current directory is $SPARKHOME, we can pass the file path to the GraphLoader.edgeListFile method:
scala> import org.apache.spark.graphx._
scala> import org.apache.spark.rdd._
scala> val emailGraph = GraphLoader.edgeListFile(sc, "./data/emailEnron.txt")
scala> emailGraph.vertices.take(5)
scala> emailGraph.edges.take(5)
In GraphX, all the edges must be directed. To express non-directed or bidirectional graphs, we can link each connected pair in both directions. In our email network, we can verify for instance that the 19021 node has both incoming and outgoing links. First, we collect the destination nodes that node 19021 communicates to: