Getting Started with Apache Spark DataFrames in Python and Scala

History of DataFrames

DataFrames are a common data science abstraction that go across languages. They really took off in the Udemy Course on Data Analysis with Python and pandas on the subject. It covers a lot of the basic ideas and operations using real world datasets and you can use the code =DATA= to get access for only $15! If you're already got experience in one of these languages the transition shouldn't be too difficult and you'll likely have an idea about what kinds of things you'd be hoping to do with DataFrames.

Goals of DataFrames

So what is the point of having a DataFrame? Why, if you've got RDDs (if you don't know what an RDD is checkout this Apache Spark RDD tutorial, do you need a DataFrame? Honestly the goal of the DataFrame is to help grow the Spark ecosystem. DataFrames are just an extension of the underlying RDD core. They also allow for easy transition from R or Python for data scientists. I think they're best thought of as a more structured RDD - where you aren't going to be doing that much munging of data. That's one of the reasons with their tight integration with more structured formats (rather than unstructured ones) so that they can take advantage of types and optimizations under the hood that wouldn't be available with unstructured data.

Benefits of DataFrames

DataFrames also add a couple of unique benefits in and of themselves. They're extremely interoperable with other environments and work really well for structured information (ie tables). They make it easy to read in lot of external data formats including:
  • Parquet
  • Hive
  • MySQL and PostgreSQL
  • JSON
  • S3
  • HDFS
Although it's worth mentioning that a lot of these formats are supported by plain old RDDs. However, they've got an added integration with python pandas DataFrames. Probably the biggest advantage is its integration with the machine learning pipeline API in MLLib. This allows you to read in some data, specify the operations that should be performed, and then train your model efficiently. There are also numerous optimizations under the hood that allow for faster and more efficient code execution. In simplest terms, think of a DataFrame as an RDD with a schema. dataframes are rdds with schema Now beyond their interface, they're going to abide by more or less the same rules that RDDs abide by, mainly the transformand actionparadigm that we spoke about early. Another added benefit that isn't supported by RDDs is the additional SQL interface that they provide which makes it easy to query highly structured data. Here's an image to make that a bit more concrete. dataframes layout The goal of the Spark DataFrame is to become the de facto big data DataFrame and I think they're well on their way to doing so.

Coding with DataFrames

Now that we've covered the history of DataFrames, let's dive right into how to use them in practice. We'll be going through using them in both python and scala by analyzing a real dataset. If you don't already have Apache Spark set up on your local machine, follow our tutorial on how to set up Apache Spark.

The Basics

Alright now that we are all setup, let's pick some data to analyze. Because I'm doing this on my local machine I'm not going to choose something too crazy. I'm going to analyze the data from the Data Expo '09, which has a bunch of data on airplane flights. I've downloaded the 2008 data to analyze but in later posts we'll be analyzing all of the data together. First let's read in the data and make sure that we've read it correctly.
val rawFlights = sc.textFile("../Downloads/2008.csv")
rawFlights.take(5)
rawFlights = sc.textFile("../Downloads/2008.csv")
rawFlights.take(5)
Now because we've read this in as a RDD, we're going to have to convert it into a DataFrame. Luckily doing so is fairly straightforward but a bit of a nuisance. We could do it manually with a case class but there are restrictions on the number of parameters that you can pass into them so we'll have to do it another way. However we're going to keep it simple and just use the spark-csv package which we need to include when we start spark using the flags --packages com.databricks:spark-csv_2.11:1.1.0 That allows us to very easily read it in as a DataFrame.
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../Downloads/2008.csv")
df.take(5)
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../Downloads/2008.csv")
df.take(5)
Once we've imported it, the next step is to make sure all of our types are correct. We can do that by checking the Schema.
df.printSchema()
df.printSchema()
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
As we can see here, it brought everything in as a string. Not really the end of the world but a bit inconvenient. Let's try converting one of these columns to another type. The way we're going to do that is with a cast and by extracting a column.
df.col("Year").cast("int")
df.col("Year").cast("int")
However once we've extracted that column, how do we add it back in? Here's the current best way of setting the type of a column in a DataFrame in Spark. Now this is definitely a work around but it works.
val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
df_1 = df.withColumnRenamed("Year","oldYear")
df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
Now we've replaced our old column with our new one. This can /definitely/ get messy if you're doing this with a lot of different columns but it works when you need it to. Let's go ahead and play around with some other calculations. To do that, we're going to need to convert a couple more columns. To do this, let's create a simple function to help us out.
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = {
  val df_1 = df.withColumnRenamed(name, "swap")
  df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
}
def convertColumn(df, name, new_type):
    df_1 = df.withColumnRenamed(name, "swap")
    return df_1.withColumn(name, df_1.col("swap").cast(new_type)).drop("swap")
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
df_3 = convertColumn(df_2, "ArrDelay", "int")
df_4 = convertColumn(df_2, "DepDelay", "int")
Now of course we'll have to keep track of our DataFrame counts but there are worse things in life. Now that we've done that, we can compute some averages. Let's get the average delay for each flight in our dataset.
val averageDelays = df_4.groupBy(df_4.col("FlightNum")).agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
averageDelays = df_4.groupBy(df_4.col("FlightNum")).agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
Now at this point we're going to give you a little hint. There's something easy that you can do to speed up your data access when you're going to access a dataset a fair amount. That's called caching and speeds up our computations after we've cached it.
averageDelays.cache()
averageDelays.cache()
Now once we computer the averageDelayswe're going to be able to access it very quickly from then on. We can access it (and cache it) by running an action. Remember,Spark is lazily evaluated - if you don't cache a value - it'll have to start all the way from scratch again.
averageDelays.show()
averageDelays.show()
That will gives us a sample of the average delays, but we're probably going to want to sort it to see the best and worst offenders. We can sort it ascending (default) or descending.
averageDelays.orderBy("AVG(ArrDelay)").show() // ascending
averageDelays.orderBy("AVG(ArrDelay)").show() // ascending
orderByis one way to sort a Spark DataFarme, another one is just the sortmethod.
averageDelays.sort($"AVG(ArrDelay)".desc).show() // descending
Running that should print out:
+---------+-----------------+------------------+
|FlightNum|    AVG(ArrDelay)|     AVG(DepDelay)|
+---------+-----------------+------------------+
|     7487|            347.0|             350.0|
|     7650|            121.0|             127.0|
|     7711|            118.0|             112.0|
|     7363|            112.5| 99.33333333333333|
|     7467|            106.5|109.16666666666667|
|     7386|            106.0|              99.5|
|     7361|            105.0|              95.8|
|     7406|             93.0|              95.0|
|     7376|             85.5|              88.0|
|     7372|81.25925925925925| 71.33333333333333|
|     7437|72.04615384615384|  68.9090909090909|
|     6870|             72.0|              30.0|
|     7401|            71.25|             49.25|
|     7456|             65.5|              63.5|
|     6114|             62.5|              54.5|
|     7419|61.46913580246913| 54.45679012345679|
|     6946|             60.4|              46.0|
|     7390|60.18421052631579|  53.1578947368421|
|     7490|             59.0|              53.0|
|     7362|57.14754098360656| 57.90163934426229|
+---------+-----------------+------------------+
We can also sort by multiple columns, however in this dataset it won't do anything since we just pulled an average and groupedBy the flight numbers.
averageDelays.sort($"AVG(ArrDelay)".desc, $"AVG(DepDelay)".desc).show()
That should give you a basic idea of DataFrame usage, lastly we'll cover pandas integration and saving your DataFrames for later.

pandas Integration

pandas is a python library for data maniulation. It's going to have an API that is very similar to that available in Spark (and has a lot more functions and goodies). To convert to a pandas DataFrame in pyspark, it's as simple as:
df.toPandas()
spark_df = sc.createDataFrame(pandas_df)

Integration with RDDs

Lastly, DataFrames are just RDDs with some extra functionality so everything that you know about RDDs (or most everything) should apply right to DataFrames. If you want to learn more about what Spark RDDs and their abilities are I would recommend that you check out our post on getting started with Apache Spark RDDs.

Saving your DataFrames

Now you're likely going to want to save your dataFrames and there are a couple of different ways to do it. One popular way is with a parquet file which is a columnar storage format. They keep data types available to your in the future and are available across the hadoop ecosystem. You can learn more here: http://parquet.apache.org/. It's commonly what I will save my files to as it makes it super easy to read them back in for later analysis!

Questions or comments?

comments powered by Disqus