The Simplest Explanation of and Approaches to Optimizing Spark Shuffles

Motivation

If you're coming across the post on the internet, you've likely been using Spark and have been looking at how you can optimize Spark code. You might have wandered across the Spark Shuffle Internals Documentation written by Kay Ousterhout as well as a presentation or two about it. I wanted to write this all down to explain it in simple terms in order to aid your understanding. I'm going to gloss over some of the more implementation-y details and focus on the high level takeaways. Basically what can you turn around implement right away to optimize the problem you're trying to solve.

The Shuffle

In simplest terms the shuffle is the reorganization of data on your cluster. The most common examples of a reorganization are:
  • A Group By Key Operation (And Other *Key operations)
  • Joins
Now importantly this means that we're going to be transferring data over the network which can be quite expensive.

Shuffle From A GroupByKey

The below shuffle diagram is a simplified explanation of the shuffle. We can see that we're doing a simple map of our original data, then we're running a groupByKey operation to group them by a key in that data which is output. Shuffle Diagram The thing is, if our data applies too much memory pressure - it will spill to disk. This is problematic because there are a couple of costs associated with doing such a thing, our data is no longer cached, we have to move around a ton of data, we've got serialization costs (and de-serialization costs). There's just a ton of challenges. We want to try and avoid these operations at all costs. As mentioned by Databricks, one should try to avoid the groupByKey operation, however this isn't always possible.

Shuffle from Joins

We're you're performing a join, data will also travel over the network. It has to at least in some way in order to be joined with other data. The thing is, there are a couple of different styles of joins and the computational advantages of each are important to understand. Imagine we've got a 6 node cluster with a variety of data styles on it. Shuffle Cluster Imagine our data being in the following formats. Shuffle Layout Now what happens when we try and perform a join. Well, data is going to get shipped around. Our goal is going to try and minimize the amount of data that gets moves around. Now in a conventional shuffle all data is going to get moved around, but what if we could optimize that? Let's try and imagine each scenario. We want to join the small table to the large table, the small table to the medium table, and the medium table to the large table.

Scenario 1: Small Table to Large

Now this one is a great opportunity for optimization because frankly it's easy. Assuming that your small table can fit in memory on the driver node, you should treat it as a broadcast variable! What this allows us to do is collect it as a Map and then send it across the cluster. That way, it is the only data that has to be transferred across the network and we don't have to shuffle our data at all. This is probably the lowest hanging fruit and one you should definitely take advantage of! One of the reasons this works particularly well is that it takes advantage of the bittorrent protocol, that means that as more nodes download the data - the faster it spreads!

Scenario 2: Small Table to Medium

Small to medium is going to be a similar process as explained above. command on an RDD.

Scenario 3: Medium Table to Large

Now, unfortunately you're likely going to end up with a shuffle and spilling to disk. The goal is to try and avoid that. Some common strategies seem to be to try and filter one of the datasets down to a small dataset (because you're likely going to have to be joining on keys anyways). Another technique would be to get them to both be a medium sized dataset. You can do this by ensuring that only the keys that you're going to be joining are in the large table. This will prevent you from having to send the data for all tables across the network. Key objective: filter it down!

Conclusions

So we've covered the basics here and this should help you tune your Spark jobs to run a bit more efficiently. However this is only the beginning of the rabbit hole! Some other things that you'll want to look into are Shuffle Behavior to reflect your use cases. There's really a lot that goes into these distributed systems and there are plenty of things to try and tune. Spark 1.5's project tungsten hoped to attack some of the inefficiencies in the Spark shuffle and there's little doubt that those improvements will continue to increase!

Questions or comments?

comments powered by Disqus