ML Pipeline Components — Estimators

An estimator is an abstraction of a learning algorithm that fits a model on a dataset.

Note
That was so machine learning to explain an estimator this way, wasn’t it? It is that the more I spend time with Pipeline API the often I use the terms and phrases from this space. Sorry.

Technically, an Estimator produces a Model (i.e. a Transformer) for a given DataFrame and parameters (as ParamMap). It fits a model to the input DataFrame and ParamMap to produce a Transformer (a Model) that can calculate predictions for any DataFrame-based input datasets.

It is basically a function that maps a DataFrame onto a Model through fit method, i.e. it takes a DataFrame and produces a Transformer as a Model.

estimator: DataFrame =[fit]=> Model

Estimators are instances of org.apache.spark.ml.Estimator abstract class that comes with fit method (with the return type M being a Model):

fit(dataset: DataFrame): M

An Estimator is a PipelineStage (so it can be a part of a Pipeline).

Note
Pipeline considers Estimator special and executes fit method before transform (as for other Transformer objects in a pipeline). Consult Pipeline document.

As an example you could use LinearRegression learning algorithm estimator to train a LinearRegressionModel.

Some of the direct specialized implementations of the Estimator abstract class are as follows:

StringIndexer

org.apache.spark.ml.feature.StringIndexer is an Estimator that produces StringIndexerModel.

val df = ('a' to 'a' + 9).map(_.toString)
  .zip(0 to 9)
  .map(_.swap)
  .toDF("id", "label")

import org.apache.spark.ml.feature.StringIndexer
val strIdx = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("index")

scala> println(strIdx.explainParams)
handleInvalid: how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later (default: error)
inputCol: input column name (current: label)
outputCol: output column name (default: strIdx_ded89298e014__output, current: index)

val model = strIdx.fit(df)
val indexed = model.transform(df)

scala> indexed.show
+---+-----+-----+
| id|label|index|
+---+-----+-----+
|  0|    a|  3.0|
|  1|    b|  5.0|
|  2|    c|  7.0|
|  3|    d|  9.0|
|  4|    e|  0.0|
|  5|    f|  2.0|
|  6|    g|  6.0|
|  7|    h|  8.0|
|  8|    i|  4.0|
|  9|    j|  1.0|
+---+-----+-----+

KMeans

KMeans class is an implementation of the K-means clustering algorithm in machine learning with support for k-means|| (aka k-means parallel) in Spark MLlib.

Roughly, k-means is an unsupervised iterative algorithm that groups input data in a predefined number of k clusters. Each cluster has a centroid which is a cluster center. It is a highly iterative machine learning algorithm that measures the distance (between a vector and centroids) as the nearest mean. The algorithm steps are repeated till the convergence of a specified number of steps.

Note
K-Means algorithm uses Lloyd’s algorithm in computer science.

It is an Estimator that produces a KMeansModel.

Tip
Do import org.apache.spark.ml.clustering.KMeans to work with KMeans algorithm.

KMeans defaults to use the following values:

  • Number of clusters or centroids (k): 2

  • Maximum number of iterations (maxIter): 20

  • Initialization algorithm (initMode): k-means||

  • Number of steps for the k-means|| (initSteps): 5

  • Convergence tolerance (tol): 1e-4

import org.apache.spark.ml.clustering._
val kmeans = new KMeans()

scala> println(kmeans.explainParams)
featuresCol: features column name (default: features)
initMode: initialization algorithm (default: k-means||)
initSteps: number of steps for k-means|| (default: 5)
k: number of clusters to create (default: 2)
maxIter: maximum number of iterations (>= 0) (default: 20)
predictionCol: prediction column name (default: prediction)
seed: random seed (default: -1689246527)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-4)

KMeans assumes that featuresCol is of type VectorUDT and appends predictionCol of type IntegerType.

Internally, fit method "unwraps" the feature vector in featuresCol column in the input DataFrame and creates an RDD[Vector]. It then hands the call over to the MLlib variant of KMeans in org.apache.spark.mllib.clustering.KMeans. The result is copied to KMeansModel with a calculated KMeansSummary.

Each item (row) in a data set is described by a numeric vector of attributes called features. A single feature (a dimension of the vector) represents a word (token) with a value that is a metric that defines the importance of that word or term in the document.

Tip

Enable INFO logging level for org.apache.spark.mllib.clustering.KMeans logger to see what happens inside a KMeans.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.mllib.clustering.KMeans=INFO

Refer to Logging.

KMeans Example

You can represent a text corpus (document collection) using the vector space model. In this representation, the vectors have dimension that is the number of different words in the corpus. It is quite natural to have vectors with a lot of zero values as not all words will be in a document. We will use an optimized memory representation to avoid zero values using sparse vectors.

This example shows how to use k-means to classify emails as a spam or not.

// NOTE Don't copy and paste the final case class with the other lines
// It won't work with paste mode in spark-shell
final case class Email(id: Int, text: String)

val emails = Seq(
  "This is an email from your lovely wife. Your mom says...",
  "SPAM SPAM spam",
  "Hello, We'd like to offer you").zipWithIndex.map(_.swap).toDF("id", "text").as[Email]

// Prepare data for k-means
// Pass emails through a "pipeline" of transformers
import org.apache.spark.ml.feature._
val tok = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("tokens")
  .setPattern("\\W+")

val hashTF = new HashingTF()
  .setInputCol("tokens")
  .setOutputCol("features")
  .setNumFeatures(20)

val preprocess = (tok.transform _).andThen(hashTF.transform)

val features = preprocess(emails.toDF)

scala> features.select('text, 'features).show(false)
+--------------------------------------------------------+------------------------------------------------------------+
|text                                                    |features                                                    |
+--------------------------------------------------------+------------------------------------------------------------+
|This is an email from your lovely wife. Your mom says...|(20,[0,3,6,8,10,11,17,19],[1.0,2.0,1.0,1.0,2.0,1.0,2.0,1.0])|
|SPAM SPAM spam                                          |(20,[13],[3.0])                                             |
|Hello, We'd like to offer you                           |(20,[0,2,7,10,11,19],[2.0,1.0,1.0,1.0,1.0,1.0])             |
+--------------------------------------------------------+------------------------------------------------------------+

import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans

scala> val kmModel = kmeans.fit(features.toDF)
16/04/08 15:57:37 WARN KMeans: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
16/04/08 15:57:37 INFO KMeans: Initialization with k-means|| took 0.219 seconds.
16/04/08 15:57:37 INFO KMeans: Run 0 finished in 1 iterations
16/04/08 15:57:37 INFO KMeans: Iterations took 0.030 seconds.
16/04/08 15:57:37 INFO KMeans: KMeans converged in 1 iterations.
16/04/08 15:57:37 INFO KMeans: The cost for the best run is 5.000000000000002.
16/04/08 15:57:37 WARN KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.
kmModel: org.apache.spark.ml.clustering.KMeansModel = kmeans_7a13a617ce0b

scala> kmModel.clusterCenters.map(_.toSparse)
res36: Array[org.apache.spark.mllib.linalg.SparseVector] = Array((20,[13],[3.0]), (20,[0,2,3,6,7,8,10,11,17,19],[1.5,0.5,1.0,0.5,0.5,0.5,1.5,1.0,1.0,1.0]))

val email = Seq("hello mom").toDF("text")
val result = kmModel.transform(preprocess(email))

scala> .show(false)
+---------+------------+---------------------+----------+
|text     |tokens      |features             |prediction|
+---------+------------+---------------------+----------+
|hello mom|[hello, mom]|(20,[2,19],[1.0,1.0])|1         |
+---------+------------+---------------------+----------+

TrainValidationSplit

Caution
FIXME

Predictors

A Predictor is a specialization of Estimator for a PredictionModel with its own abstract train method.

train(dataset: DataFrame): M

The train method is supposed to ease dealing with schema validation and copying parameters to a trained PredictionModel model. It also sets the parent of the model to itself.

A Predictor is basically a function that maps a DataFrame onto a PredictionModel.

predictor: DataFrame =[train]=> PredictionModel

It implements the abstract fit(dataset: DataFrame) of the Estimator abstract class that validates and transforms the schema of a dataset (using a custom transformSchema of PipelineStage), and then calls the abstract train method.

Validation and transformation of a schema (using transformSchema) makes sure that:

  1. features column exists and is of correct type (defaults to Vector).

  2. label column exists and is of Double type.

As the last step, it adds the prediction column of Double type.

The following is a list of Predictor examples for different learning algorithms:

DecisionTreeClassifier

DecisionTreeClassifier is a ProbabilisticClassifier that…​

Caution
FIXME

LinearRegression

LinearRegression is an example of Predictor (indirectly through the specialized Regressor private abstract class), and hence a Estimator, that represents the linear regression algorithm in Machine Learning.

LinearRegression belongs to org.apache.spark.ml.regression package.

Tip
Read the scaladoc of LinearRegression.

It expects org.apache.spark.mllib.linalg.Vector as the input type of the column in a dataset and produces LinearRegressionModel.

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

The acceptable parameters:

scala> println(lr.explainParams)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (default: )
LinearRegression.train
train(dataset: DataFrame): LinearRegressionModel

train (protected) method of LinearRegression expects a dataset DataFrame with two columns:

  1. label of type DoubleType.

  2. features of type Vector.

It returns LinearRegressionModel.

It first counts the number of elements in features column (usually features). The column has to be of mllib.linalg.Vector type (and can easily be prepared using HashingTF transformer).

val spam = Seq(
  (0, "Hi Jacek. Wanna more SPAM? Best!"),
  (1, "This is SPAM. This is SPAM")).toDF("id", "email")

import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()
val spamTokens = regexTok.setInputCol("email").transform(spam)

scala> spamTokens.show(false)
+---+--------------------------------+---------------------------------------+
|id |email                           |regexTok_646b6bcc4548__output          |
+---+--------------------------------+---------------------------------------+
|0  |Hi Jacek. Wanna more SPAM? Best!|[hi, jacek., wanna, more, spam?, best!]|
|1  |This is SPAM. This is SPAM      |[this, is, spam., this, is, spam]      |
+---+--------------------------------+---------------------------------------+

import org.apache.spark.ml.feature.HashingTF
val hashTF = new HashingTF()
  .setInputCol(regexTok.getOutputCol)
  .setOutputCol("features")
  .setNumFeatures(5000)

val spamHashed = hashTF.transform(spamTokens)

scala> spamHashed.select("email", "features").show(false)
+--------------------------------+----------------------------------------------------------------+
|email                           |features                                                        |
+--------------------------------+----------------------------------------------------------------+
|Hi Jacek. Wanna more SPAM? Best!|(5000,[2525,2943,3093,3166,3329,3980],[1.0,1.0,1.0,1.0,1.0,1.0])|
|This is SPAM. This is SPAM      |(5000,[1713,3149,3370,4070],[1.0,1.0,2.0,2.0])                  |
+--------------------------------+----------------------------------------------------------------+

// Create labeled datasets for spam (1)
val spamLabeled = spamHashed.withColumn("label", lit(1d))

scala> spamLabeled.show
+---+--------------------+-----------------------------+--------------------+-----+
| id|               email|regexTok_646b6bcc4548__output|            features|label|
+---+--------------------+-----------------------------+--------------------+-----+
|  0|Hi Jacek. Wanna m...|         [hi, jacek., wann...|(5000,[2525,2943,...|  1.0|
|  1|This is SPAM. Thi...|         [this, is, spam.,...|(5000,[1713,3149,...|  1.0|
+---+--------------------+-----------------------------+--------------------+-----+

val regular = Seq(
  (2, "Hi Jacek. I hope this email finds you well. Spark up!"),
  (3, "Welcome to Apache Spark project")).toDF("id", "email")
val regularTokens = regexTok.setInputCol("email").transform(regular)
val regularHashed = hashTF.transform(regularTokens)
// Create labeled datasets for non-spam regular emails (0)
val regularLabeled = regularHashed.withColumn("label", lit(0d))

val training = regularLabeled.union(spamLabeled).cache

scala> training.show
+---+--------------------+-----------------------------+--------------------+-----+
| id|               email|regexTok_646b6bcc4548__output|            features|label|
+---+--------------------+-----------------------------+--------------------+-----+
|  2|Hi Jacek. I hope ...|         [hi, jacek., i, h...|(5000,[72,105,942...|  0.0|
|  3|Welcome to Apache...|         [welcome, to, apa...|(5000,[2894,3365,...|  0.0|
|  0|Hi Jacek. Wanna m...|         [hi, jacek., wann...|(5000,[2525,2943,...|  1.0|
|  1|This is SPAM. Thi...|         [this, is, spam.,...|(5000,[1713,3149,...|  1.0|
+---+--------------------+-----------------------------+--------------------+-----+

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

// the following calls train by the Predictor contract (see above)
val lrModel = lr.fit(training)

// Let's predict whether an email is a spam or not
val email = Seq("Hi Jacek. you doing well? Bye!").toDF("email")
val emailTokens = regexTok.setInputCol("email").transform(email)
val emailHashed = hashTF.transform(emailTokens)

scala> lrModel.transform(emailHashed).select("prediction").show
+-----------------+
|       prediction|
+-----------------+
|0.563603440350882|
+-----------------+

RandomForestRegressor

RandomForestRegressor is a concrete Predictor for Random Forest learning algorithm. It trains RandomForestRegressionModel (a subtype of PredictionModel) using DataFrame with features column of Vector type.

Caution
FIXME
import org.apache.spark.mllib.linalg.Vectors
val features = Vectors.sparse(10, Seq((2, 0.2), (4, 0.4)))

val data = (0.0 to 4.0 by 1).map(d => (d, features)).toDF("label", "features")
// data.as[LabeledPoint]

scala> data.show(false)
+-----+--------------------------+
|label|features                  |
+-----+--------------------------+
|0.0  |(10,[2,4,6],[0.2,0.4,0.6])|
|1.0  |(10,[2,4,6],[0.2,0.4,0.6])|
|2.0  |(10,[2,4,6],[0.2,0.4,0.6])|
|3.0  |(10,[2,4,6],[0.2,0.4,0.6])|
|4.0  |(10,[2,4,6],[0.2,0.4,0.6])|
+-----+--------------------------+

import org.apache.spark.ml.regression.{ RandomForestRegressor, RandomForestRegressionModel }
val rfr = new RandomForestRegressor
val model: RandomForestRegressionModel = rfr.fit(data)

scala> model.trees.foreach(println)
DecisionTreeRegressionModel (uid=dtr_247e77e2f8e0) of depth 1 with 3 nodes
DecisionTreeRegressionModel (uid=dtr_61f8eacb2b61) of depth 2 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_63fc5bde051c) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_64d4e42de85f) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_693626422894) of depth 3 with 9 nodes
DecisionTreeRegressionModel (uid=dtr_927f8a0bc35e) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_82da39f6e4e1) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_cb94c2e75bd1) of depth 0 with 1 nodes
DecisionTreeRegressionModel (uid=dtr_29e3362adfb2) of depth 1 with 3 nodes
DecisionTreeRegressionModel (uid=dtr_d6d896abcc75) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_aacb22a9143d) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_18d07dadb5b9) of depth 2 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_f0615c28637c) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_4619362d02fc) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_d39502f828f4) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_896f3a4272ad) of depth 3 with 9 nodes
DecisionTreeRegressionModel (uid=dtr_891323c29838) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_d658fe871e99) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_d91227b13d41) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_4a7976921f4b) of depth 2 with 5 nodes

scala> model.treeWeights
res12: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)

scala> model.featureImportances
res13: org.apache.spark.mllib.linalg.Vector = (1,[0],[1.0])

Example

The following example uses LinearRegression estimator.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val data = (0.0 to 9.0 by 1)                      // create a collection of Doubles
  .map(n => (n, n))                               // make it pairs
  .map { case (label, features) =>
    LabeledPoint(label, Vectors.dense(features)) } // create labeled points of dense vectors
  .toDF                                           // make it a DataFrame

scala> data.show
+-----+--------+
|label|features|
+-----+--------+
|  0.0|   [0.0]|
|  1.0|   [1.0]|
|  2.0|   [2.0]|
|  3.0|   [3.0]|
|  4.0|   [4.0]|
|  5.0|   [5.0]|
|  6.0|   [6.0]|
|  7.0|   [7.0]|
|  8.0|   [8.0]|
|  9.0|   [9.0]|
+-----+--------+

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

val model = lr.fit(data)

scala> model.intercept
res1: Double = 0.0

scala> model.coefficients
res2: org.apache.spark.mllib.linalg.Vector = [1.0]

// make predictions
scala> val predictions = model.transform(data)
predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]

scala> predictions.show
+-----+--------+----------+
|label|features|prediction|
+-----+--------+----------+
|  0.0|   [0.0]|       0.0|
|  1.0|   [1.0]|       1.0|
|  2.0|   [2.0]|       2.0|
|  3.0|   [3.0]|       3.0|
|  4.0|   [4.0]|       4.0|
|  5.0|   [5.0]|       5.0|
|  6.0|   [6.0]|       6.0|
|  7.0|   [7.0]|       7.0|
|  8.0|   [8.0]|       8.0|
|  9.0|   [9.0]|       9.0|
+-----+--------+----------+

import org.apache.spark.ml.evaluation.RegressionEvaluator

// rmse is the default metric
// We're explicit here for learning purposes
val regEval = new RegressionEvaluator().setMetricName("rmse")
val rmse = regEval.evaluate(predictions)

scala> println(s"Root Mean Squared Error: $rmse")
Root Mean Squared Error: 0.0

import org.apache.spark.mllib.linalg.DenseVector
// NOTE Follow along to learn spark.ml-way (not RDD-way)
predictions.rdd.map { r =>
  (r(0).asInstanceOf[Double], r(1).asInstanceOf[DenseVector](0).toDouble, r(2).asInstanceOf[Double]))
  .toDF("label", "feature0", "prediction").show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
|  0.0|     0.0|       0.0|
|  1.0|     1.0|       1.0|
|  2.0|     2.0|       2.0|
|  3.0|     3.0|       3.0|
|  4.0|     4.0|       4.0|
|  5.0|     5.0|       5.0|
|  6.0|     6.0|       6.0|
|  7.0|     7.0|       7.0|
|  8.0|     8.0|       8.0|
|  9.0|     9.0|       9.0|
+-----+--------+----------+

// Let's make it nicer to the eyes using a Scala case class
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
case class Prediction(label: Double, feature0: Double, prediction: Double)
object Prediction {
  def apply(r: Row) = new Prediction(
    label = r(0).asInstanceOf[Double],
    feature0 = r(1).asInstanceOf[DenseVector](0).toDouble,
    prediction = r(2).asInstanceOf[Double])
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
defined class Prediction
defined object Prediction

scala> predictions.rdd.map(Prediction.apply).toDF.show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
|  0.0|     0.0|       0.0|
|  1.0|     1.0|       1.0|
|  2.0|     2.0|       2.0|
|  3.0|     3.0|       3.0|
|  4.0|     4.0|       4.0|
|  5.0|     5.0|       5.0|
|  6.0|     6.0|       6.0|
|  7.0|     7.0|       7.0|
|  8.0|     8.0|       8.0|
|  9.0|     9.0|       9.0|
+-----+--------+----------+

results matching ""

    No results matching ""