Using SparkSQL UDFs to Create Date Times in Apache Spark

What are UDFs?

UDFs or user defined functions are a simple way of adding a function into the SparkSQL language. This function operates on distributed DataFrames and works row by row (unless you're creating an user defined aggregation function. You can find UDFs across a variety of SQL environments and certainly come up in Apache Hive. Think of it this way, it's common in SQL to perform something like a sum function. Here's an example, in this query - we've got a function makeDateTime That function is a User Defined Function one that we made up and added into the SQL language, that creates a date out of separate fields.
SELECT customer_id, makeDateTime(date_field, time_field, timezone) as datetime, amount
FROM purchases;

Use Cases

So what's the point? Well, that makes it a lot easier to work in a language that you understand and use in your domain. It's an easy way(just like functions in programming) of abstracting away a common operation.

Defining Spark UDFs

Let's go through the process of defining our own UDF. We're going to define the UDF from the example above and use it on a Spark DataFrame. Defining our udf is pretty easy, we just create an anonymous function and register it through the SqlContextor through the udf function in org.apache.spark.sql.functions.udfdepending on how you want to use it. Here's the set up. Imagine purchases is a DataFrame in the layout of:
  • customer\_id
  • purchase\_id
  • date
  • time
  • tz
  • amount
Our goal here is to actually get a datetime field that we can use, Let's go ahead and give it a shot.
case class Purchase(customer_id: Int, purchase_id: Int, date: String, time: String, tz: String, amount:Double)

val x = sc.parallelize(Array(
  Purchase(123, 234, "2007-12-12", "20:50", "UTC", 500.99),
  Purchase(123, 247, "2007-12-12", "15:30", "PST", 300.22),
  Purchase(189, 254, "2007-12-13", "00:50", "EST", 122.19),
  Purchase(187, 299, "2007-12-12", "07:30", "UTC", 524.37)
))

val df = sqlContext.createDataFrame(x)
df.registerTempTable("df")
Now let's define our functions! The underscores simply signify that it's a partially applied function.
def makeDT(date: String, time: String, tz: String) = s"$date $time $tz"
sqlContext.udf.register("makeDt", makeDT(_:String,_:String,_:String))

// Now we can use our function directly in SparkSQL.
sqlContext.sql("SELECT amount, makeDt(date, time, tz) from df").take(2)
// but not outside
df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2) // fails
You can see above that we can use it within SQL but not outside of it. To do that we're going to have to create a different UDF using spark.sql.function.udf
import org.apache.spark.sql.functions.udf
val makeDt = udf(makeDT(_:String,_:String,_:String))
// now this works
df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2)
Pretty simple right? but nothing that's too crazy, we're really just joining these fields together. Now we can see that it's working however we've still got a string representation. In Spark version 1.5, functions to create date times were introduced. You can see them in the documentation however I always find than an example is worth more than the docs. Now we can leave our function the same however we're just going to create a format and wrap our MakeDT function in the unix_timestampfunction call, we can do this both in and out of SparkSQL!
import org.apache.spark.sql.functions.unix_timestamp

val fmt = "yyyy-MM-dd hh:mm z"
df.select($"customer_id", unix_timestamp(makeDt($"date", $"time", $"tz"), fmt), $"amount").take(2)
sqlContext.sql(s"SELECT customer_id, unix_timestamp(makeDt(date, time, tz), '$fmt'), amount FROM df").take(2)
As any programmer knows, working with time is extremely annoying and luckily Spark makes it fairly straightforward to do - we can create java DateTime formats which are fairly easy to work with. This system is really pretty flexible too!

Conclusion

In this tutorial we learned how to create datetimes in Spark by leveraging UDFs and some new functions in SparkSQL! Let me know below if you have any questions or particularly troubling use cases.

Questions or comments?

comments powered by Disqus