Getting Started with Apache Spark RDDs

Introduction

Apache Spark is project originally out of UC Berkeley's AmpLab and features some amazing features like Ampcamp and use it on a regular basis in some research projects. It's really much more than a dedicated project. It's much more of an ecosystem of tools and libraries and that's what make it such a powerful tool. This introductory guide will take your through the basics in both Scala and in PySpark (Python Spark).

The Problem Spark Solves

I always like to best understand the problem that is being solved when I approach new tools. To throw some buzzwords around, you're looking to perform some data science on some big data. More simply, you've got a ton of information that you want to understand more about and doing it on your computer would just be too slow. Some great examples of data problems that are solved well by a tool like Apache Spark include: 1. Analyzing Log Data - In order to hunt down a bug happening on a production server(s) 2. Massive Natural Language Processing - Finally some use for all that twitter data you've been downloading... 3. Large Scale Recommendation Systems or General Machine Learning Tasks - Recommending products to users or trying to find related groups While this is far from an exhaustive list, it gives a starting point to the problem we're solving: we've got a ton of information and we want to extract (actionable) information from it.

Downloading + Building Spark

I've already written about this and would encourage you to check out my article on how to Download and build it. You can find instructions for how to build Apache Spark for a cluster of machines.

The Basics

Now I'm going to run this tutorial on my local machine but the same should apply on a cluster. Below you're going to find code for python and Scala. One thing to keep in mind is that you can use the IPython Shell with PySpark which is an awesome tool. It's as simple as setting the export IPYTHON=1environmental variable in your shell. So let's get started.

The Data

For this tutorial I'm going to be using the Rotten Tomatoes Kaggle Dataset which you can download here. It's not "big data" by any means but it's fairly structured and makes for a good introductory dataset. Download it here: https://www.kaggle.com/c/sentiment-analysis-on-movie-reviews

The Spark Context

The Spark Context is the main entry point for Spark functionality. A SparkContext instance connects us to the Spark Cluster and is used to create just about everything in that cluster. If everything has started up correctly, it becomes available as scin your application. Go ahead and check it out by typing sc
sc
// res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@67521a79
sc
# 

Loading Data

Now we've got our rotten tomatoes dataset, so let's go ahead and open it up. Spark makes it super easy to open files, you've just got to do it from the Spark Context in order to load it in as an RDD. Be sure you've got the correct directory for the file, mine is just in the same directory so I don't have to do any path switching. Relative or absolute paths will work just fine.
val data = sc.textFile("train.tsv")
data = sc.textFile("train.tsv")
Now at this point you should have noticed that we didn't get any "feedback" nothing was printed besides some log statements. That's because Spark is lazily evaluated, it won't execute any statements (besides registering them) until you perform an action. We'll get to that in a minute but first let's check out the types that we're working with.
data.getClass
// res10: Class[_ <: org.apache.spark.rdd.RDD[String]] = class org.apache.spark.rdd.MapPartitionsRDD
type(data)
# 
We can see we've got a MapPartitionsRDD. Now we're going to go into the details of what this specific RDD is, however we are going to talk about RDDs - the core abstraction in Spark.

The Core Spark Abstraction: RDDs or Resilient Distributed Datasets

Our data is loaded in as an RDD or a Resilient Distributed Dataset. Quite simply it's a representation of a dataset that is distributed throughout the cluster. the basic rdd abstraction In my current example I'm running a local version however the same principles apply to the local machine as well as the cluster.. One of the most important concepts of RDDs is that they're immutable. I love immutable data. You should love immutable data. This means that given the same RDD we will always get the same answer - this also allows for Spark to make some optimizations under the hood. If a job fails, it just has to perform the operation again - there's no state (beyond the current step it is performing) that Spark (or you) need to keep track of. The way we manipulate data is through a series of transformationsto data stored in RDDs. Finally you get data back when you perform an action Spark is *lazily evaluated* and only actions force code evaluation.

Transformations

Transformations are just manipulations of the data. For example, this dataset is a tsv so we're going to need to split it up into its relevant columns. We do that with a map Other transformations include filtering, reducing, grouping, etc. They transform from one rdd to the next like in the below image. rdd transformation diagram /Just as a note, our columns are tab separated and arranged as follows: PhraseId SentenceId Phrase Sentiment/
val reviews = data.map(_.split("\t"))
reviews = data.map(lambda x: x.split("\t"))
You can see that these don't print anything out. That's because we haven't really asked a direct "question" of the data. Like how many values we have in the dataset. A "question" is equivalent to an action If you check the type of your object at this point you'll see that we've still got an RDD. Actions, for all intents and purposes give us back a native type (like a list or Array).

Actions

As mentioned previously, actions are ways of answering a specific question or requesting a result. Actions include getting the first value, getting several values, or counting the total number of values. For example, we've performed some transformations: Now we want to get an actual value back, as mentioned we do that with an action. rdd transformation diagram rdd action diagram In this instance we're going to be performing a count, this will count up all the values in our RDD. In the above diagram I'm performing a takewhich will take 10 values from the RDD.
reviews.count()
// Long: 156061
reviews.count()
156061
The transformation/action abstraction gets some getting used to but you'll see this terminology (action/transformation) used a fair amount.

Word Counts

Now word counting is typically the "Hello World" of big data because doing it is pretty straightforward. Let's go ahead and get word counts for our Kaggle dataset. We can see that the relevant column we want to work with is the /Phrase/ column. Let's just start off by getting the top 10 words in the entire dataset.
// see if we're doing word counts correctly
val wordCounts = reviews.flatMap(x => x(2).split(" ")).map((_, 1)).reduceByKey((a, b) => a + b).sortBy(_._2, false)

wordCounts.take(5) // get top 5
word_counts = reviews.flatMap(lambda x : x[2].split()).map(lambda x: (x,1)).reduceByKey(lambda a,b: a + b).sortBy(lambda x: x[1],False)

word_counts.take(5) # get top 5
Wow, that's a bit of a mouthful so let's go ahead and break it down.
reviews.flatMap(x => x(2).split(" ")).take(5)
word_counts = reviews.flatMap(lambda x : x[2].split()).take(5)
First what we're doing it we're creating a flatmap of all the words. Since we're just concerned about word counts right now across the entire set - we're just going to focus on that third column. So the flat map takes our list of data, gets the third column and splits it on the ~" "~ space value and then returns that list of words.
reviews.flatMap(x => x(2).split(" ")).map((_, 1)).take(5)
word_counts = reviews.flatMap(lambda x : x[2].split()).map(lambda x: (x,1)).take(5)

word_counts.take(5) # get top 5
At this point we've got a new RDD basically, of those values. What we need to do next is the standard "big data" map reduce style word count. We map each word to the number 1 because it appears once.
reviews.flatMap(x => x(2).split(" ")).map((_, 1)).reduceByKey((a, b) => a + b).take(5)
word_counts = reviews.flatMap(lambda x : x[2].split()).map(lambda x: (x,1)).reduceByKey(lambda a,b: a + b)
word_counts.take(5) # get top 5
Once we've done that we reduce it by the key. This basically means that the first value in this tuple is the key and we want to reduce it by some supplied function which in this case just adds up the values (all the individual ones). If this is all feeling a bit strange to you, I would recommend brushing up on the MapReduce programming paradigm. However once we've gotten this reduction, we can just sort it by a given function and that's what gives us our results! That's what returns us to our word countvariable. Then we just need to perform an action to get those values back! Taking this code to the next level is super easy, let's try getting negative word counts!

Negative Word Counts

We're going to be able to reuse a fair amount of code here because we're just going to end up placing something different into our pipeline. Rather than using the entire dataset, we're just going to grab the negative reviews with a filtertransformation. Now if we just try and go for it right away by doing something like what we've got below - that won't actually work. That's because we've got a header row in the data. Go ahead and run the code below to see that error.
// see if we're doing word counts correctly
val negativeReviews = reviews.filter(x(3).toInt == 0)

val negWordCounts = negativeReviews.flatMap(x => x(2).split(" ")).map((_, 1)).reduceByKey((a, b) => a + b).sortBy(_._2, false)

wordCounts.take(5) // get top 5
negativeReviews = reviews.filter(lambda x: int(x[3]) == 0)

word_counts = reviews.flatMap(lambda x : x[2].split()).map(lambda x: (x,1)).reduceByKey(lambda a,b: a + b).sortBy(lambda x: x[1],False)

word_counts.take(5) # get top 5
To remove that header row is unfortunately a bit of an annoyance. There are a couple of different ways to do it you could do it with case classes in Scala for example but likely the simplest way is to do it by value.
val trueReviews = reviews.filter(x => x(0) != "PhraseId")

val negativeReviews = trueReviews.filter(x => x(3).toInt == 0)

val negWordCounts = negativeReviews.flatMap(x => x(2).split(" ")).map((_, 1)).reduceByKey((a, b) => a + b).sortBy(_._2, false)

negWordCounts.take(50)
header = reviews.first()

true_reviews = reviews.filter(lambda x: x != header)

negativeReviews = true_reviews.filter(lambda x: int(x[3]) == 0)

neg_word_counts = reviews.flatMap(lambda x : x[2].split()).map(lambda x: (x,1)).reduceByKey(lambda a,b: a + b).sortBy(lambda x: x[1],False)

neg_word_counts.take(50)
This is awesome because we're really just applying the same transformation we did above. I think that this is one of the great powers of Spark. It's got a simple interface that once you get the hang of allows you to do these very specific transformations. It also allows for extremely iterative development as you can just take a couple of values and see if you're getting the result that you want. Now obviously there are a lot of ways that we can improve this. Better splitting of the words, removal of vowels, etc but in this introduction we just learned a ton! In some later posts we'll start working with larger datasets, SparkSQL, and Spark DataFrames.

Questions or comments?

comments powered by Disqus