ML Pipelines and PipelineStages (spark.ml)

ML Pipeline API (aka Spark ML or spark.ml due to the package the API lives in) lets Spark users quickly and easily assemble and configure practical distributed Machine Learning pipelines (aka workflows) by standardizing the APIs for different Machine Learning concepts.

Note
Both scikit-learn and GraphLab have the concept of pipelines built into their system.

The ML Pipeline API is a new DataFrame-based API developed under org.apache.spark.ml package and is the primary API for MLlib as of Spark 2.0.

Important
The previous RDD-based API under org.apache.spark.mllib package is in maintenance-only mode which means that it is still maintained with bug fixes but no new features are expected.

The key concepts of Pipeline API (aka spark.ml Components):

spark mllib pipeline.png
Figure 1. Pipeline with Transformers and Estimator (and corresponding Model)

The beauty of using Spark ML is that the ML dataset is simply a DataFrame (and all calculations are simply UDF applications on columns).

Use of a machine learning algorithm is only one component of a predictive analytic workflow. There can also be additional pre-processing steps for the machine learning algorithm to work.

Note
While a RDD computation in Spark Core, a Dataset manipulation in Spark SQL, a continuous DStream computation in Spark Streaming are the main data abstractions a ML Pipeline is in Spark MLlib.

A typical standard machine learning workflow is as follows:

  1. Loading data (aka data ingestion)

  2. Extracting features (aka feature extraction)

  3. Training model (aka model training)

  4. Evaluate (or predictionize)

You may also think of two additional steps before the final model becomes production ready and hence of any use:

  1. Testing model (aka model testing)

  2. Selecting the best model (aka model selection or model tuning)

  3. Deploying model (aka model deployment and integration)

Note
The Pipeline API lives under org.apache.spark.ml package.

Given the Pipeline Components, a typical machine learning pipeline is as follows:

  • You use a collection of Transformer instances to prepare input DataFrame - the dataset with proper input data (in columns) for a chosen ML algorithm.

  • You then fit (aka build) a Model.

  • With a Model you can calculate predictions (in prediction column) on features input column through DataFrame transformation.

Example: In text classification, preprocessing steps like n-gram extraction, and TF-IDF feature weighting are often necessary before training of a classification model like an SVM.

Upon deploying a model, your system must not only know the SVM weights to apply to input features, but also transform raw data into the format the model is trained on.

  • Pipeline for text categorization

  • Pipeline for image classification

Pipelines are like a query plan in a database system.

Components of ML Pipeline:

  • Pipeline Construction Framework – A DSL for the construction of pipelines that includes concepts of Nodes and Pipelines.

    • Nodes are data transformation steps (Transformers)

    • Pipelines are a DAG of Nodes.

      Pipelines become objects that can be saved out and applied in real-time to new data.

It can help creating domain-specific feature transformers, general purpose transformers, statistical utilities and nodes.

You could eventually save or load machine learning components as described in Persisting Machine Learning Components.

Note
A machine learning component is any object that belongs to Pipeline API, e.g. Pipeline, LinearRegressionModel, etc.

Features of Pipeline API

The features of the Pipeline API in Spark MLlib:

  • DataFrame as a dataset format

  • ML Pipelines API is similar to scikit-learn

  • Easy debugging (via inspecting columns added during execution)

  • Parameter tuning

  • Compositions (to build more complex pipelines out of existing ones)

Pipelines

A ML pipeline (or a ML workflow) is a sequence of Transformers and Estimators to fit a PipelineModel to an input dataset.

pipeline: DataFrame =[fit]=> DataFrame (using transformers and estimators)

A pipeline is represented by Pipeline class.

import org.apache.spark.ml.Pipeline

Pipeline is also an Estimator (so it is acceptable to set up a Pipeline with other Pipeline instances).

The Pipeline object can read or load pipelines (refer to Persisting Machine Learning Components page).

read: MLReader[Pipeline]
load(path: String): Pipeline

You can create a Pipeline with an optional uid identifier. It is of the format pipeline_[randomUid] when unspecified.

val pipeline = new Pipeline()

scala> println(pipeline.uid)
pipeline_94be47c3b709

val pipeline = new Pipeline("my_pipeline")

scala> println(pipeline.uid)
my_pipeline

The identifier uid is used to create an instance of PipelineModel to return from fit(dataset: DataFrame): PipelineModel method.

scala> val pipeline = new Pipeline("my_pipeline")
pipeline: org.apache.spark.ml.Pipeline = my_pipeline

scala> val df = (0 to 9).toDF("num")
df: org.apache.spark.sql.DataFrame = [num: int]

scala> val model = pipeline.setStages(Array()).fit(df)
model: org.apache.spark.ml.PipelineModel = my_pipeline

The stages mandatory parameter can be set using setStages(value: Array[PipelineStage]): this.type method.

Pipeline Fitting (fit method)

fit(dataset: DataFrame): PipelineModel

The fit method returns a PipelineModel that holds a collection of Transformer objects that are results of Estimator.fit method for every Estimator in the Pipeline (with possibly-modified dataset) or simply input Transformer objects. The input dataset DataFrame is passed to transform for every Transformer instance in the Pipeline.

It first transforms the schema of the input dataset DataFrame.

It then searches for the index of the last Estimator to calculate Transformers for Estimator and simply return Transformer back up to the index in the pipeline. For each Estimator the fit method is called with the input dataset. The result DataFrame is passed to the next Transformer in the chain.

Note
An IllegalArgumentException exception is thrown when a stage is neither Estimator or Transformer.

transform method is called for every Transformer calculated but the last one (that is the result of executing fit on the last Estimator).

The calculated Transformers are collected.

After the last Estimator there can only be Transformer stages.

The method returns a PipelineModel with uid and transformers. The parent Estimator is the Pipeline itself.

PipelineStage

The PipelineStage abstract class represents a single stage in a Pipeline.

PipelineStage has the following direct implementations (of which few are abstract classes, too):

Each PipelineStage transforms schema using transformSchema family of methods:

transformSchema(schema: StructType): StructType
transformSchema(schema: StructType, logging: Boolean): StructType
Note
StructType describes a schema of a DataFrame.
Tip

Enable DEBUG logging level for the respective PipelineStage implementations to see what happens beneath.

results matching ""

    No results matching ""