Analyzing Flight Data: A Gentle Introduction to GraphX in Spark

What is a graph?

graphs in apache spark Graphs are probably one of the coolest representations of information. And they're not just your typical line chart (although those to do qualify in a sense), these are networks of inter-connected vertices. Now what's so cool about networks, is that they represent a lot of both tangible and abstract things. Take a social network (this is a common example of a graph). Each person in a social network is connected by some sort of relationship be it a friendship or marriage. That's a pretty simple one, now let's think a bit more abstractly. As we learned in a previous article /transformations/ in Spark just apply an operation to an RDD and output another RDD (lazily), eventually you may call and /action/ which will yield a result. Now take a step back and look at the below diagram, does it seem like it might be a graph? rdd transformation as a dag That's because it absolutely is one! It's what we call a directed acyclic graph, which we will define in just a moment. Right now you can just think of it as a graph that only goes one direction. Pretty awesome huh? Now start thinking about all the locations that you visit on a daily basis. That's a graph too. Now finally we're going to go abstract. Think about when you're talking with a friend and can almost predict what they're going to say. That's can be approximated by a markov model, where there's an attached probability to the word that you're going to expect next. This is also a graph but the direction that you go is based on a probability.

Graph Theory

All of this falls under a branch of mathematics called graph theory. For the most part we're going to avoid working with theory because the rabbit hole goes very deep but we will be covering some basic terminology.

Terminology

We're going to keep things pretty simple because there are a lot of different kinds of graphs and we'll just focus on a couple of different models. Before we go any further we've got to introduce some terminology that we're going to be using. nodes and edges in a graph *Nodes:* Nodes, or vertices, are the points in the graph, they're the "friends" in our above example or the RDDs in our other example. Vertices are typically things, like people or places. *Edges:* Edges are the lines that connect different vertices together. They're the relationships between things like a friendship, marriage, or a transformation. *Weights:* Weights allow for you to provide some sort of strength to an edge. For example, you may be closer friends than you are with someone else. A city on a map may be closer than another one, weighted by distance. *Directed v. Undirected:* Directed graphs are graphs that, as you might have guessed, have a direction. Our RDD image is a directed graph because a transformation only happens in one direction. Undirected graphs are more like the friendships we mentioned above, when you're friends with someone that friendship exists between you and your friend, you can't be friends with someone who is not friends with you. *Cyclic v. Acyclic:* A cyclic graph is one where you can start at a certain point and move back to that same point. A perfect example is a social network, you can start at one friendship and typically circle between several different friends to get back to that original person. Acyclic graphs are ones where if you start at a given vertex, no matter what edges you follow, you cannot get back to that original vertex. These can be combined with the above to create directed acyclic graphs or other variations.

Graphs Conclusion

Graphs are a much simpler way for us to think about some problems however they can introduce some complexity. At the end of the day, they're all about describing relationships between things. Semantic relationships are simple for us humans to understand, however computers can struggle with the conceptual mapping that is many times obvious in a graph. Additionally, because they're lacking this semantic understanding, optimizing what we may actually want out of a graph representation can be a challenge. Additionally, take a look at this algorithmic complexity cheatsheet, you'll see that a fair amount of algorithms operate with "horrible" level complexity. That's because we typically have to traverse the entire set of edges and nodes several times to calculate our answer. Even given this complexity, their semantic ability to represent relationships is likely one of the core motivations for GraphX: give us (as humans) a simple interface to computer large scale graph problems.

Basics of GraphX

GraphX, as you might have guessed, is built upon this basic paradigm of graph theory. It's awesome. graphx logo

Requirements

Now this tutorial is only going to cover Scala because at this time, Apache Spark only has a Scala API for GraphX.

The Setup & Data

Let's go ahead and get set up with a dataset that I've introduced before in my Databricks.
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.MurmurHash
val df_1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../Downloads/2008.csv")

Creating a Graph from our Data

Now what we've got to do is get this into a format that the GraphX will expect. This means that we should make it into a more optimized format using primitive datatypes as GraphX has some optimizations under the hood that allow for faster computation. /If you're ever confused with the values that we are working with at a given step of the process, just .take(5)and you'll see exactly what we're working with./
val flightsFromTo = df_1.select($"Origin",$"Dest")
val airportCodes = df_1.select($"Origin", $"Dest").flatMap(x => Iterable(x(0).toString, x(1).toString))
Those basically prep our edges and our vertices. The next step is to convert our airport codes to primitive data types, make them distinct, and turn them into a list of vertices. That involves two conceptual maps. The distinct values of what we have now, mapped to an unique integer id, then turned into a list of vertices. You'll see below that we have to specify the types for Scala to perform this calculation correctly.
val airportVertices: RDD[(VertexId, String)] = airportCodes.distinct().map(x => (MurmurHash.stringHash(x), x))
val defaultAirport = ("Missing")
Now in order to give these edges some information, we're going to count the number of flights to and from airports. Naturally there are a ton of ways to do this but I'm going to stick to the simple MapReduce style right now. What we need to do is convert them into edges from airport ID to airport ID (of the primitive type), then validate and convert them with the Edgecase class.
val flightEdges = flightsFromTo.map(x =>
    ((MurmurHash.stringHash(x(0).toString),MurmurHash.stringHash(x(1).toString)), 1)).reduceByKey(_+_).map(x => Edge(x._1._1, x._1._2,x._2))
Now we can create our graph!
val graph = Graph(airportVertices, flightEdges, defaultAirport)
graph.persist() // we're going to be using it a lot

Basic Statistics

Great, now we've created our graph and we've also used a ton of Spark core types. We started with a DataFrame, converted to an RDD for vertices and edges, then created a graph. Now this graph is actually a pretty nice representation. This version doesn't have a ton of attached data but we can get the basics. First let's see which airport has the most flights to and from it. It's worth stating that our edges are flights from an airport to another one - this makes it so that we have directed graph That will be important in a moment, for now though let's just answer some basic questions.
  • How many airports are there?
  • How many unique flights from airport A to airport B are there?
  • graph.numVertices // 305
    graph.numEdges // 5366
  • What are the top 10 flights from airport to airport?
  • graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
        "There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10)
    res60: Array[String] = Array(There were 13788 flights from SFO to LAX., There were 13390 flights from LAX to SFO., There were 12383 flights from OGG to HNL., There were 12035 flights from LGA to BOS., There were 12029 flights from BOS to LGA., There were 12014 flights from HNL to OGG., There were 11773 flights from LAX to LAS., There were 11729 flights from LAS to LAX., There were 11257 flights from LAX to SAN., There were 11224 flights from SAN to LAX.)
  • What are the lowest 10 flights from airport to airport?
graph.triplets.sortBy(_.attr).map(triplet =>
    "There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10)
res62: Array[String] = Array(There were 1 flights from RNO to PIH., There were 1 flights from PHL to ICT., There were 1 flights from FSD to PIA., There were 1 flights from RIC to JAX., There were 1 flights from MOD to BFL., There were 1 flights from ASE to MSN., There were 1 flights from JFK to HPN., There were 1 flights from MCO to LIT., There were 1 flights from ROA to BWI., There were 1 flights from OMA to ABQ.)
So awesome. Graphs are great because they are aligning more along the semantics of what we're trying to do here - explaining the relationships between airports. We can see in this last example that there are a lot of airports that just get these weird one off flights - these would probably be interesting to explore later on - why are there so few flights? What did they exist at all? Was there an in-flight problem? Now let's think about some other statistics. Because this is a directed graph we can get data on which airports are flown to the most and flown out of the most. This is known as degree information. For example, what airport has the most in degrees or unique flights into it?
graph.inDegrees.join(airportVertices).sortBy(_._2._1, ascending=false).take(1)
Array[(org.apache.spark.graphx.VertexId, (Int, String))] = Array((2042033420,(173,ATL)))
And out of it?
graph.outDegrees.join(airportVertices).sortBy(_._2._1, ascending=false).take(1)
Array[(org.apache.spark.graphx.VertexId, (Int, String))] = Array((2042033420,(173,ATL)))

PageRank

We've covered some basic statistics so let's go on to an algorithm that's helped build Google called PageRank. PageRank was created by Larry Page and Sergey Brin while they were PhD students at Stanford and was originally intended to measure the importance of web pages (remember that graph structure we were talking about? It applies to web pages too!), however it works with anything that can be represented as a graph. Our airports make a perfect example for this. The parameter that we have to specify is the tolerance which is a measure of convergence. There are two versions included in Spark - static and dynamic. We'll give an example of the dynamic one.
val ranks = graph.pageRank(0.0001).vertices
So what are our most important airports!?
val ranksAndAirports = ranks.join(airportVertices).sortBy(_._2._1, ascending=false).map(_._2._2)
ranksAndAirports.take(10)
res81: Array[String] = Array(ATL, DFW, ORD, MSP, SLC, DEN, DTW, IAH, CVG, LAX)
And there you have it! The most important airports according to PageRank for flights in 2008. We saw before that Atlanta had the most imbound and outbound links so this isn't a complete surprise, however it's interesting that even though SFO and LAX had the most flights between then, this doesn't completely determine their importance in the greater network.

Conclusion

Well there we have it! We've covered the basics of GraphX in Spark and run some interesting statistics and algorithms to get to know 2008 air traffic a little bit better. In some later articles we're going to work with larger sets of data to compute across different years so subscribe on the right hand side!

Questions or comments?

comments powered by Disqus