model: DataFrame =[predict]=> DataFrame (with predictions)
ML Pipeline Models
Model
abstract class is a Transformer with the optional Estimator that has produced it (as a transient parent
field).
Note
|
An Estimator is optional and is available only after fit (of an Estimator) has been executed whose result a model is.
|
As a Transformer
it takes a DataFrame
and transforms it to a result DataFrame
with prediction
column added.
There are two direct implementations of the Model
class that are not directly related to a concrete ML algorithm:
PipelineModel
Caution
|
PipelineModel is a private[ml] class.
|
PipelineModel
is a Model
of Pipeline estimator.
Once fit, you can use the result model as any other models to transform datasets (as DataFrame
).
A very interesting use case of PipelineModel
is when a Pipeline
is made up of Transformer instances.
// Transformer #1
import org.apache.spark.ml.feature.Tokenizer
val tok = new Tokenizer().setInputCol("text")
// Transformer #2
import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF().setInputCol(tok.getOutputCol).setOutputCol("features")
// Fuse the Transformers in a Pipeline
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(tok, hashingTF))
val dataset = Seq((0, "hello world")).toDF("id", "text")
// Since there's no fitting, any dataset works fine
val featurize = pipeline.fit(dataset)
// Use the pipelineModel as a series of Transformers
scala> featurize.transform(dataset).show(false)
+---+-----------+------------------------+--------------------------------+
|id |text |tok_8aec9bfad04a__output|features |
+---+-----------+------------------------+--------------------------------+
|0 |hello world|[hello, world] |(262144,[71890,72594],[1.0,1.0])|
+---+-----------+------------------------+--------------------------------+
PredictionModel
PredictionModel
is an abstract class to represent a model for prediction algorithms like regression and classification (that have their own specialized models - details coming up below).
PredictionModel
is basically a Transformer with predict
method to calculate predictions (that end up in prediction
column).
PredictionModel
belongs to org.apache.spark.ml
package.
import org.apache.spark.ml.PredictionModel
The contract of PredictionModel
class requires that every custom implementation defines predict
method (with FeaturesType
type being the type of features
).
predict(features: FeaturesType): Double
The direct less-algorithm-specific extensions of the PredictionModel
class are:
As a custom Transformer
it comes with its own custom transform
method.
Internally, transform
first ensures that the type of the features
column matches the type of the model and adds the prediction
column of type Double
to the schema of the result DataFrame
.
It then creates the result DataFrame
and adds the prediction
column with a predictUDF
function applied to the values of the features
column.
Caution
|
FIXME A diagram to show the transformation from a dataframe (on the left) and another (on the right) with an arrow to represent the transformation method. |
Tip
|
Enable Add the following line to
Refer to Logging. |
ClassificationModel
ClassificationModel
is a PredictionModel that transforms a DataFrame
with mandatory features
, label
, and rawPrediction
(of type Vector) columns to a DataFrame with prediction
column added.
Note
|
A Model with ClassifierParams parameters, e.g. ClassificationModel , requires that a DataFrame have the mandatory features , label (of type Double ), and rawPrediction (of type Vector) columns.
|
ClassificationModel
comes with its own transform
(as Transformer) and predict
(as PredictionModel).
The following is a list of the known ClassificationModel
custom implementations (as of March, 24th):
-
ProbabilisticClassificationModel
(theabstract
parent of the following classification models)-
DecisionTreeClassificationModel
(final
) -
LogisticRegressionModel
-
NaiveBayesModel
-
RandomForestClassificationModel
(final
)
-
RegressionModel
RegressionModel
is a PredictionModel that transforms a DataFrame
with mandatory label
, features
, and prediction
columns.
It comes with no own methods or values and so is more a marker abstract class (to combine different features of regression models under one type).
LinearRegressionModel
LinearRegressionModel
represents a model produced by a LinearRegression estimator. It transforms the required features
column of type org.apache.spark.mllib.linalg.Vector.
Note
|
It is a private[ml] class so what you, a developer, may eventually work with is the more general RegressionModel , and since RegressionModel is just a marker no-method abstract class, it is more a PredictionModel.
|
As a linear regression model that extends LinearRegressionParams
it expects the following schema of an input DataFrame
:
-
label
(required) -
features
(required) -
prediction
-
regParam
-
elasticNetParam
-
maxIter
(Int) -
tol
(Double) -
fitIntercept
(Boolean) -
standardization
(Boolean) -
weightCol
(String) -
solver
(String)
(New in 1.6.0) LinearRegressionModel
is also a MLWritable
(so you can save it to a persistent storage for later reuse).
With DEBUG
logging enabled (see above) you can see the following messages in the logs when transform
is called and transforms the schema.
16/03/21 06:55:32 DEBUG LinearRegressionModel: Input schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}}]}
16/03/21 06:55:32 DEBUG LinearRegressionModel: Expected output schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"prediction","type":"double","nullable":false,"metadata":{}}]}
The implementation of predict
for LinearRegressionModel
calculates dot(v1, v2)
of two Vectors - features
and coefficients
- (of DenseVector
or SparseVector
types) of the same size and adds intercept
.
Note
|
The coefficients Vector and intercept Double are the integral part of LinearRegressionModel as the required input parameters of the constructor.
|
LinearRegressionModel Example
// Create a (sparse) Vector
import org.apache.spark.mllib.linalg.Vectors
val indices = 0 to 4
val elements = indices.zip(Stream.continually(1.0))
val sv = Vectors.sparse(elements.size, elements)
// Create a proper DataFrame
val ds = sc.parallelize(Seq((0.5, sv))).toDF("label", "features")
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
// Importing LinearRegressionModel and being explicit about the type of model value
// is for learning purposes only
import org.apache.spark.ml.regression.LinearRegressionModel
val model: LinearRegressionModel = lr.fit(ds)
// Use the same ds - just for learning purposes
scala> model.transform(ds).show
+-----+--------------------+----------+
|label| features|prediction|
+-----+--------------------+----------+
| 0.5|(5,[0,1,2,3,4],[1...| 0.5|
+-----+--------------------+----------+
RandomForestRegressionModel
RandomForestRegressionModel
is a PredictionModel with features
column of type Vector.
Interestingly, DataFrame
transformation (as part of Transformer contract) uses SparkContext.broadcast to send itself to the nodes in a Spark cluster and calls calculates predictions (as prediction
column) on features
.
KMeansModel
KMeansModel
is a Model
of KMeans algorithm.
It belongs to org.apache.spark.ml.clustering
package.
// See spark-mllib-estimators.adoc#KMeans
val kmeans: KMeans = ???
val trainingDF: DataFrame = ???
val kmModel = kmeans.fit(trainingDF)
// Know the cluster centers
scala> kmModel.clusterCenters
res0: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.3], [0.1,0.1])
val inputDF = Seq((0.0, Vectors.dense(0.2, 0.4))).toDF("label", "features")
scala> kmModel.transform(inputDF).show(false)
+-----+---------+----------+
|label|features |prediction|
+-----+---------+----------+
|0.0 |[0.2,0.4]|0 |
+-----+---------+----------+