Using Pipelines to run machine learning models in Spark MLlib

Author: Shihab Rahman
Software Engineer
Machine Learning / Big Data / Cloud

Apache Spark, advertised as a ‘lightning fast cluster computing and analytics engine’, was developed in AMPLab at UC Berkeley, and later released as an Apache open source project in 2014. Since it’s release it has gained immense popularity among developers for its speed, generality, and ease of use. Under the hood, Apache Spark uses Hadoop MapReduce but extends it efficiently to provide more ways of computation such as in-memory interactive queries and stream/batch processing.

Machine Learning in Apache Spark (MLlib)

As days go by, datasets are massively growing in size and complexity. As they grow, there is always a pressing need for developing systems to harness these datasets and use them to build intelligent systems. To address this, several large-scale data-flow engines have been developed on MapReduce (2004), but running machine learning algorithms in distributed clusters on those engines was always an issue. Since Spark’s emergence as a widely used fault tolerant and general purpose cluster computing engine, it began proving its ability and suitability to run large-scale machine learning algorithms. Spark’s distributed machine learning framework is called MLlib.  

At a high level, MLlib currently provides almost all popular ML algorithms (classification, regression etc), featurization (extraction, transformation, dimensionality reduction and selection), pipelines (construction,evaluation and tune of ML modeling), persistence (saving and loading algorithms, models and pipelines) and some other utilities such as statistics and linear algebra.

Pipelines in Apache MLlib

Almost every machine learning algorithm involves pre-processing of data, feature extraction, fitting of the model, and validation stages. As there are so many tools and libraries available to do these things, we can use separate ones for each of the stages as per our convenience. However, sometimes for large-scale data and multilevel machine learning ecosystems, connecting these dots may not be as simple as it sounds. Also, most of the ML frameworks do not support distributed in-memory computation as Apache Spark does. To resolve this, Apache Spark introduced Pipelines to run machine learning algorithms in a controlled and configurable manner.

A Pipeline is just a sequence of dependent stages. In Pipelines, there are two types of stages:

  • Transformer: It transforms a Dataframe(dataset with named column) into another Dataframe based on the logic you implement. There are several available implementations of transformers in Spark. For example, HashingTF is a Transformer which maps a sequence of terms in a column to their term frequencies using a hashing algorithm.
  • Estimator: It’s an abstraction of a learning algorithm which fits a Model on a training dataset. As an input, it takes a Dataframe and ParamMap (key-value pair of hyperparameters of the algorithm) and produces a Model which is a Transformer that can be used to calculate predictions on a given test input. For example, LogisticRegression is one such Estimator that can be used to run a logistic regression algorithm on a given training dataset.

Why use Pipelines?

Let’s consider a scenario. Let’s say we decided to build a machine learning model with just three features. We pre-processed our data, extracted our three features, transformed our features to vectors, and then trained it to produce some model. But later after some validation data, we figured we need to add some more features to the input, normalize some of them and maybe tune some hyperparameters. We need to change a lot of code, and make sure every dependent process works exactly as before. In the future, there is also a possibility that we may need to add another set of dependent stages that will rely upon our current prediction data. These changes are messy and prone to error.

A Pipeline wraps up the whole construction and running process and gives us a declarative interface where we can see the whole workflow and monitor/tune/extend it easily.

Example: Logistic Regression using Pipelines API

In this example, we will run a text classification algorithm using logistic regression. We will be using Scala for this example, but please keep in mind that Spark’s unified API is available for Java and Python as well.

At first, let’s say we have a training data with a label like this:

The output is 1.0 if the input data contains a Spark related statement, and 0 otherwise.

As a first step of creating Pipelines, we will be declaring different stages. These stages are merely instructions and will run in a chained fashion when we call .fit operation on the pipeline.

The first stage of our pipeline is a Tokenizer. It’s a transformer that converts a column of string values to lowercase first and then splits it by whitespace. Here it will convert the “text” column and will put the converted tokenized data into “words” column.

In the next step, we declared a HashingTF transformer which takes the split-up words and, for each word, computes the term frequency and produces a fixed-length frequency vector. It also maps raw features into an index using a hash function. 

At this point, we have done the feature extraction part. Now we need to define which ML algorithm we want to run on this dataset. For our example, we chose LogisticRegression. For simplicity, in the algorithm, we decided to keep the maximum number of iterations as 10, and the regression parameter as 0.01.

Our declaration of the stages is done. Now we need to create our pipeline for running the whole process. Note that the above three stages must run in a chained fashion. That means the term frequency vector generation must be done after tokenization of the data, and then running regression comes after these two stages. To do that we define the chain of these stages in our pipeline.

Our pipeline construction is done. But the whole process has not started yet. A .fit operation triggers the event and runs these stages separately. As a pipeline is also an estimator, calling the .fit produces the model we want on a given training dataset.

We can save this model locally on disk, AWS S3, or any other server to use it later.

The model that has been trained can be used to predict the outcomes. To do that lets first create a test dataset without any labels.

Our model is a Transformer. We can use it to transform our test dataset and produce predictions like this:

That’s it! We have our first ML algorithm implemented using Pipelines.