ML Pipelines and PipelineStages (spark.ml)
ML Pipeline API (aka Spark ML or spark.ml due to the package the API lives in) lets Spark users quickly and easily assemble and configure practical distributed Machine Learning pipelines (aka workflows) by standardizing the APIs for different Machine Learning concepts.
Note
|
Both scikit-learn and GraphLab have the concept of pipelines built into their system. |
The ML Pipeline API is a new DataFrame-based API developed under org.apache.spark.ml
package and is the primary API for MLlib as of Spark 2.0.
Important
|
The previous RDD-based API under org.apache.spark.mllib package is in maintenance-only mode which means that it is still maintained with bug fixes but no new features are expected.
|
The key concepts of Pipeline API (aka spark.ml Components):
The beauty of using Spark ML is that the ML dataset is simply a DataFrame (and all calculations are simply UDF applications on columns).
Use of a machine learning algorithm is only one component of a predictive analytic workflow. There can also be additional pre-processing steps for the machine learning algorithm to work.
Note
|
While a RDD computation in Spark Core, a Dataset manipulation in Spark SQL, a continuous DStream computation in Spark Streaming are the main data abstractions a ML Pipeline is in Spark MLlib. |
A typical standard machine learning workflow is as follows:
-
Loading data (aka data ingestion)
-
Extracting features (aka feature extraction)
-
Training model (aka model training)
-
Evaluate (or predictionize)
You may also think of two additional steps before the final model becomes production ready and hence of any use:
-
Testing model (aka model testing)
-
Selecting the best model (aka model selection or model tuning)
-
Deploying model (aka model deployment and integration)
Note
|
The Pipeline API lives under org.apache.spark.ml package. |
Given the Pipeline Components, a typical machine learning pipeline is as follows:
-
You use a collection of
Transformer
instances to prepare inputDataFrame
- the dataset with proper input data (in columns) for a chosen ML algorithm. -
You then fit (aka build) a
Model
. -
With a
Model
you can calculate predictions (inprediction
column) onfeatures
input column through DataFrame transformation.
Example: In text classification, preprocessing steps like n-gram extraction, and TF-IDF feature weighting are often necessary before training of a classification model like an SVM.
Upon deploying a model, your system must not only know the SVM weights to apply to input features, but also transform raw data into the format the model is trained on.
-
Pipeline for text categorization
-
Pipeline for image classification
Pipelines are like a query plan in a database system.
Components of ML Pipeline:
-
Pipeline Construction Framework – A DSL for the construction of pipelines that includes concepts of Nodes and Pipelines.
-
Nodes are data transformation steps (Transformers)
-
Pipelines are a DAG of Nodes.
Pipelines become objects that can be saved out and applied in real-time to new data.
-
It can help creating domain-specific feature transformers, general purpose transformers, statistical utilities and nodes.
You could eventually save
or load
machine learning components as described in Persisting Machine Learning Components.
Note
|
A machine learning component is any object that belongs to Pipeline API, e.g. Pipeline, LinearRegressionModel, etc. |
Features of Pipeline API
The features of the Pipeline API in Spark MLlib:
-
DataFrame as a dataset format
-
ML Pipelines API is similar to scikit-learn
-
Easy debugging (via inspecting columns added during execution)
-
Parameter tuning
-
Compositions (to build more complex pipelines out of existing ones)
Pipelines
A ML pipeline (or a ML workflow) is a sequence of Transformers and Estimators to fit a PipelineModel to an input dataset.
pipeline: DataFrame =[fit]=> DataFrame (using transformers and estimators)
A pipeline is represented by Pipeline class.
import org.apache.spark.ml.Pipeline
Pipeline
is also an Estimator (so it is acceptable to set up a Pipeline
with other Pipeline
instances).
The Pipeline
object can read
or load
pipelines (refer to Persisting Machine Learning Components page).
read: MLReader[Pipeline]
load(path: String): Pipeline
You can create a Pipeline
with an optional uid
identifier. It is of the format pipeline_[randomUid]
when unspecified.
val pipeline = new Pipeline()
scala> println(pipeline.uid)
pipeline_94be47c3b709
val pipeline = new Pipeline("my_pipeline")
scala> println(pipeline.uid)
my_pipeline
The identifier uid
is used to create an instance of PipelineModel to return from fit(dataset: DataFrame): PipelineModel
method.
scala> val pipeline = new Pipeline("my_pipeline")
pipeline: org.apache.spark.ml.Pipeline = my_pipeline
scala> val df = (0 to 9).toDF("num")
df: org.apache.spark.sql.DataFrame = [num: int]
scala> val model = pipeline.setStages(Array()).fit(df)
model: org.apache.spark.ml.PipelineModel = my_pipeline
The stages
mandatory parameter can be set using setStages(value: Array[PipelineStage]): this.type
method.
Pipeline Fitting (fit method)
fit(dataset: DataFrame): PipelineModel
The fit
method returns a PipelineModel that holds a collection of Transformer
objects that are results of Estimator.fit
method for every Estimator
in the Pipeline (with possibly-modified dataset
) or simply input Transformer
objects. The input dataset
DataFrame is passed to transform
for every Transformer
instance in the Pipeline.
It first transforms the schema of the input dataset
DataFrame.
It then searches for the index of the last Estimator
to calculate Transformers for Estimator
and simply return Transformer
back up to the index in the pipeline. For each Estimator
the fit
method is called with the input dataset
. The result DataFrame is passed to the next Transformer
in the chain.
Note
|
An IllegalArgumentException exception is thrown when a stage is neither Estimator or Transformer .
|
transform
method is called for every Transformer
calculated but the last one (that is the result of executing fit
on the last Estimator
).
The calculated Transformers are collected.
After the last Estimator
there can only be Transformer
stages.
The method returns a PipelineModel
with uid
and transformers. The parent Estimator
is the Pipeline
itself.
PipelineStage
The PipelineStage abstract class represents a single stage in a Pipeline.
PipelineStage
has the following direct implementations (of which few are abstract classes, too):
Each PipelineStage
transforms schema using transformSchema
family of methods:
transformSchema(schema: StructType): StructType
transformSchema(schema: StructType, logging: Boolean): StructType
Note
|
StructType describes a schema of a DataFrame. |
Tip
|
Enable |
Further reading or watching
-
(video) Building, Debugging, and Tuning Spark Machine Learning Pipelines - Joseph Bradley (Databricks)
-
(video) Spark MLlib: Making Practical Machine Learning Easy and Scalable
-
(video) Apache Spark MLlib 2 0 Preview: Data Science and Production by Joseph K. Bradley (Databricks)