Spark MLLib - Predict Store Sales with ML Pipelines

Overview

Recently I had to work on a Machine Learning problem for class and found a good opportunity for a Spark Tutorial. Using store sales from Rossmann found on kaggle, we are going to set up a machine learning pipeline to cover everything from the preprocessing all the way to making and saving predictions. In a sense, this is a "full-stack" machine learning project that's probably fairly similar to something we might do in the real world. Spark's ML Pipelines API is going to make it very easy for us to do this. You can follow along with the code on my github or below.

The Data

The data can be found on the kaggle page. Go ahead and download both the training and test sets. Here's a short description from the site about what we're working with:
Rossmann operates over 3,000 drug stores in 7 European countries. Currently, Rossmann store managers are tasked with predicting their daily sales for up to six weeks in advance. Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied. In their first Kaggle competition, Rossmann is challenging you to predict 6 weeks of daily sales for 1,115 stores located across Germany. Reliable sales forecasts enable store managers to create effective staff schedules that increase productivity and motivation. By helping Rossmann create a robust prediction model, you will help store managers stay focused on what’s most important to them: their customers and their teams!

Review of Linear Regression

There are several assumptions (likely invalidated by our data) that are made by the model.
  • That our input data is drawn from a multivariate normal distribution, ie that our variables are independent and normally distributed
  • Observations are independent of one another
  • A Linear, additive relationship between our input variables and our output variables
  • Homoscedasticity of the error terms, or that the error terms should be distributed normally around the regression line
  • Our variables are measured without systematic error (and like the point above, that the error values are drawn from some normal process, not caused by a confounding variable)
  • Let's review those assumptions in this case. We're violating the first one because we our SchoolHoliday and StateHoliday variables are likely to be correlated. We're working with Time Series data so we're likely violating the second one as well. In general, finding problems that strictly fit the linear relationship is difficult and this problem is no exception, so we're likely violating the third assumption as well. The last two we can hope for but again, we're likely violating. With all that being said, we should still at least experiment with this model because it's so well understood and it makes for a strong baseline predictor for future exploration. This aligns with the Concepts of Structural Risk Minimization(if you don't understand SRM, don't worry about it). Now let's define the loss function for linear regression. In linear regression, we hope to minimize the squared error, defined as: L_n = ||Xw^T - y||^2 Where X is our input Matrix, w is our weight vector and y is our output vector. There is a closed form solution to this problem that can be found by inverting the inner product but Spark, because of it's orientation towards big data doesn't actually look to solve the problem this way. Spark leverages gradient descent methods to efficiently descend down the gradient of risk function to arrive at (hopefully) a solution near the global minimum.

Coding up Our Pipeline

If you're going to enter this into the console you're going to want to perform all of our imports.
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics

Preprocessing & Preparing Pipelines

Now sometimes preprocessing might occur in an ad hoc basis but if you want to build something for production, it's worth putting it all into a pipeline. Machine learning pipelines are excellent ways of making your data science repeatable and efficient. The first thing that we're going to need to do is make sure that our data is in a format in which we can input it into our models. That means it needs to be numerical. To do that we're going to use a couple of nifty Spark pre-processing functions, namely the StringIndexer, OneHotEncoder, and VectorAssembler. Respectively these allow us to:
  • Convert string based categorical features to numerical categorical features
  • Convert numerical based categorical features(in one column) to numerical continuous features (one column per category) /increasing sparsity/
  • Assemble a vector from a variety of columns and vectors that may have been previously created.
  • Here's how we create the Indexers and Encoders for our data. You'll notice that we have to apply these to basically all columns because basically none of them are continuous.
    val stateHolidayIndexer = new StringIndexer()
      .setInputCol("StateHoliday")
      .setOutputCol("StateHolidayIndex")
    val schoolHolidayIndexer = new StringIndexer()
      .setInputCol("SchoolHoliday")
      .setOutputCol("SchoolHolidayIndex")
    val stateHolidayEncoder = new OneHotEncoder()
      .setInputCol("StateHolidayIndex")
      .setOutputCol("StateHolidayVec")
    val schoolHolidayEncoder = new OneHotEncoder()
      .setInputCol("SchoolHolidayIndex")
      .setOutputCol("SchoolHolidayVec")
    val dayOfMonthEncoder = new OneHotEncoder()
      .setInputCol("DayOfMonth")
    .setOutputCol("DayOfMonthVec")
    val dayOfWeekEncoder = new OneHotEncoder()
      .setInputCol("DayOfWeek")
    .setOutputCol("DayOfWeekVec")
    val storeEncoder = new OneHotEncoder()
      .setInputCol("Store")
      .setOutputCol("StoreVec")
    Secondly we're going to want to assemble all of our vectors together into one vector to input into our model.
    val assembler = new VectorAssembler()
      .setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
        "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
      .setOutputCol("features")

    Creating the Pipelines

    Pipelines Overview

    What we've got here with Pipelines is a way of performing a repeated set of steps in sequence. Much like the DAG that we saw in the the core concepts of the Pipeline:
  • Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms DataFrame with features into a DataFrame with predictions.
  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
  • Once you see the code, you'll notice that in our train test split we're also setting an Evaluatorthat will judge how well our model is doing and automatically select the best parameter for us to use based on that metric. This means that we get to train lots and lots of different models to see which one is best. Super simple! Let's walk through the creation for each model.

    Linear Regression

    The pipeline we've set up is pretty self explanatory.
    def preppedLRPipeline():TrainValidationSplit = {
      val lr = new LinearRegression()
    
      val paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam, Array(0.1, 0.01))
        .addGrid(lr.fitIntercept)
        .addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
        .build()
    
      val pipeline = new Pipeline()
        .setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
          stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
          dayOfWeekEncoder, dayOfMonthEncoder,
          assembler, lr))
    
      val tvs = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(new RegressionEvaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.75)
      tvs
    }
    We're going to create our model, use the above created indexers and assemblers and run our data through it. The one thing to note is that by default the pipeline uses the labeland featurescolumns as the output column and input features (regardless of the fact that this is regression). If we want to we can manually set these using the appropriate setter methods on our Linear Regression instance.

    Bringing in our Data

    Now that we've set up our pipeline lets go through importing our data. We've got to do just a little bit of data scrubbing to make sure that we're handing null values as Spark is a bit touchy about null values in our DataFrames.
    def loadTrainingData(sqlContext:HiveContext):DataFrame = {
      val trainRaw = sqlContext
        .read.format("com.databricks.spark.csv")
        .option("header", "true")
        .load("../mlproject/rossman/train.csv")
        .repartition(6)
      trainRaw.registerTempTable("raw_training_data")
    
      sqlContext.sql("""SELECT
        double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
        StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
        FROM raw_training_data
      """).na.drop()
    }
    
    def loadKaggleTestData(sqlContext:HiveContext) = {
      val testRaw = sqlContext
        .read.format("com.databricks.spark.csv")
        .option("header", "true")
        .load("../mlproject/rossman/test.csv")
        .repartition(6)
      testRaw.registerTempTable("raw_test_data")
    
      val testData = sqlContext.sql("""SELECT
        Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
        SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
        FROM raw_test_data
        WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
          OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
      """).na.drop() // weird things happen if you don't filter out the null values manually
    
      Array(testRaw, testData) // got to hold onto testRaw so we can make sure
      // to have all the prediction IDs to submit to kaggle
    }
    Bringing in the Data will build what we learned we about how to use Spark DataFrames. It's all fairly straightforward but feel free to comment below with any questions. One thing you'll notice is that we're performing several splits of the data. Our training set gets split into the training and validation sets automatically by our TrainValidationSplit class and we've already set aside our own test set that we'll use as an internal test before submitting to kaggle.

    Saving the Predictions

    One place where Spark's youth shows is its handling of null values. We had to do a bit of data juggling previously to remove the null values (because the pipeline will fail if they're included), below we've got some code that makes it easy to save the predictions and join them with the true values.
    def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
      val tdOut = testRaw
        .select("Id")
        .distinct()
        .join(predictions, testRaw("Id") === predictions("PredId"), "outer")
        .select("Id", "Sales")
        .na.fill(0:Double) // some of our inputs were null so we have to
                           // fill these with something
      tdOut
        .coalesce(1)
        .write.format("com.databricks.spark.csv")
        .option("header", "true")
        .save("linear_regression_predictions.csv")
    }

    Fitting, Testing, and Using The Model

    Now we've brought in our data, created our pipeline, we are now ready to train up our models and see how they perform. This will take some time to run because we are exploring a hyperparameter space for each model. It takes time to try out all the permutations in our parameter grid as well as create a training set for each tree so be patient!
    def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
      val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
      logger.info("Fitting data")
      val model = tvs.fit(training)
      logger.info("Now performing test on hold out set")
      val holdout = model.transform(test).select("prediction","label")
    
      // have to do a type conversion for RegressionMetrics
      val rm = new RegressionMetrics(holdout.rdd.map(x =>
        (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
    
      logger.info("Test Metrics")
      logger.info("Test Explained Variance:")
      logger.info(rm.explainedVariance)
      logger.info("Test R^2 Coef:")
      logger.info(rm.r2)
      logger.info("Test MSE:")
      logger.info(rm.meanSquaredError)
      logger.info("Test RMSE:")
      logger.info(rm.rootMeanSquaredError)
    
      model
    }
    Here's our linear regression:
    val data = loadTrainingData(sqlContext)
    val Array(testRaw, testData) = loadKaggleTestData(sqlContext)
    
    // The linear Regression Pipeline
    val linearTvs = preppedLRPipeline()
    logger.info("evaluating linear regression")
    val lrModel = fitModel(linearTvs, data)
    logger.info("Generating kaggle predictions")
    val lrOut = lrModel.transform(testData)
      .withColumnRenamed("prediction","Sales")
      .withColumnRenamed("Id","PredId")
      .select("PredId", "Sales")
    savePredictions(lrOut, testRaw)
    Here's the output:
    
    15/11/06 10:14:32 INFO RossmannRegression$: Test Explained Variance:
    15/11/06 10:15:29 INFO RossmannRegression$: 1.152061820772418E7
    15/11/06 10:15:29 INFO RossmannRegression$: Test R^2 Coef:
    15/11/06 10:15:29 INFO RossmannRegression$: 0.7779152287252036
    15/11/06 10:15:29 INFO RossmannRegression$: Test MSE:
    15/11/06 10:15:29 INFO RossmannRegression$: 3273414.1560751097
    15/11/06 10:15:29 INFO RossmannRegression$: Test RMSE:
    15/11/06 10:15:29 INFO RossmannRegression$: 1809.257902034729
    
    Which is surprisingly good!

    Results and Conclusion

    Now I'm sure you're excited to see the results so let's dive in and check out what Kaggle tells us!

    Linear Regression Results

    Our error rate is actually close to the Kaggle error rate we get when we submit to the public leaderboard: 0.21174! Pretty sweet for a model that makes so many assumptions!

    Overkill

    With Spark here, we brought a bazooka to a knife fight. This dataset is tiny and easily fits on one machine, so using Spark is a bit crazy. However it makes for a great example for how to use machine learning pipelines in Apache Spark!

    Questions or comments?

    comments powered by Disqus