transformer: DataFrame =[transform]=> DataFrame
ML Pipeline Components — Transformers
A transformer is a function object that maps (aka transforms) a DataFrame
into another DataFrame
(both called datasets).
Transformers prepare a dataset for an machine learning algorithm to work with. They are also very helpful to transform DataFrames in general (even outside the machine learning space).
Transformers are instances of org.apache.spark.ml.Transformer abstract class that offers transform
family of methods:
transform(dataset: DataFrame): DataFrame
transform(dataset: DataFrame, paramMap: ParamMap): DataFrame
transform(dataset: DataFrame, firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame
A Transformer
is a PipelineStage and thus can be a part of a Pipeline.
A few available implementations of Transformer
:
-
VectorAssembler — a feature transformer that assembles (merges) multiple columns into a (feature) vector column.
See Custom UnaryTransformer section for a custom Transformer
implementation.
StopWordsRemover
StopWordsRemover
is a machine learning feature transformer that takes a string array column and outputs a string array column with all defined stop words removed. The transformer comes with a standard set of English stop words as default (that are the same as scikit-learn uses, i.e. from the Glasgow Information Retrieval Group).
Note
|
It works as if it were a UnaryTransformer but it has not been migrated to extend the class yet. |
StopWordsRemover
class belongs to org.apache.spark.ml.feature
package.
import org.apache.spark.ml.feature.StopWordsRemover
val stopWords = new StopWordsRemover
It accepts the following parameters:
scala> println(stopWords.explainParams)
caseSensitive: whether to do case-sensitive comparison during filtering (default: false)
inputCol: input column name (undefined)
outputCol: output column name (default: stopWords_9c2c0fdd8a68__output)
stopWords: stop words (default: [Ljava.lang.String;@5dabe7c8)
Note
|
null values from the input array are preserved unless adding null to stopWords explicitly.
|
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer("regexTok")
.setInputCol("text")
.setPattern("\\W+")
import org.apache.spark.ml.feature.StopWordsRemover
val stopWords = new StopWordsRemover("stopWords")
.setInputCol(regexTok.getOutputCol)
val df = Seq("please find it done (and empty)", "About to be rich!", "empty")
.zipWithIndex
.toDF("text", "id")
scala> stopWords.transform(regexTok.transform(df)).show(false)
+-------------------------------+---+------------------------------------+-----------------+
|text |id |regexTok__output |stopWords__output|
+-------------------------------+---+------------------------------------+-----------------+
|please find it done (and empty)|0 |[please, find, it, done, and, empty]|[] |
|About to be rich! |1 |[about, to, be, rich] |[rich] |
|empty |2 |[empty] |[] |
+-------------------------------+---+------------------------------------+-----------------+
Binarizer
Binarizer
is a Transformer
that splits the values in the input column into two groups - "ones" for values larger than the threshold
and "zeros" for the others.
It works with DataFrames with the input column of DoubleType
or VectorUDT. The type of the result output column matches the type of the input column, i.e. DoubleType
or VectorUDT
.
import org.apache.spark.ml.feature.Binarizer
val bin = new Binarizer()
.setInputCol("rating")
.setOutputCol("label")
.setThreshold(3.5)
scala> println(bin.explainParams)
inputCol: input column name (current: rating)
outputCol: output column name (default: binarizer_dd9710e2a831__output, current: label)
threshold: threshold used to binarize continuous features (default: 0.0, current: 3.5)
val doubles = Seq((0, 1d), (1, 1d), (2, 5d)).toDF("id", "rating")
scala> bin.transform(doubles).show
+---+------+-----+
| id|rating|label|
+---+------+-----+
| 0| 1.0| 0.0|
| 1| 1.0| 0.0|
| 2| 5.0| 1.0|
+---+------+-----+
import org.apache.spark.mllib.linalg.Vectors
val denseVec = Vectors.dense(Array(4.0, 0.4, 3.7, 1.5))
val vectors = Seq((0, denseVec)).toDF("id", "rating")
scala> bin.transform(vectors).show
+---+-----------------+-----------------+
| id| rating| label|
+---+-----------------+-----------------+
| 0|[4.0,0.4,3.7,1.5]|[1.0,0.0,1.0,0.0]|
+---+-----------------+-----------------+
SQLTransformer
SQLTransformer
is a Transformer
that does transformations by executing SELECT … FROM THIS
with THIS
being the underlying temporary table registered for the input dataset.
Internally, THIS
is replaced with a random name for a temporary table (using registerTempTable).
Note
|
It has been available since Spark 1.6.0. |
It requires that the SELECT query uses THIS
that corresponds to a temporary table and simply executes the mandatory statement
using sql method.
You have to specify the mandatory statement
parameter using setStatement
method.
import org.apache.spark.ml.feature.SQLTransformer
val sql = new SQLTransformer()
// dataset to work with
val df = Seq((0, s"""hello\tworld"""), (1, "two spaces inside")).toDF("label", "sentence")
scala> sql.setStatement("SELECT sentence FROM __THIS__ WHERE label = 0").transform(df).show
+-----------+
| sentence|
+-----------+
|hello world|
+-----------+
scala> println(sql.explainParams)
statement: SQL statement (current: SELECT sentence FROM __THIS__ WHERE label = 0)
VectorAssembler
VectorAssembler
is a feature transformer that assembles (merges) multiple columns into a (feature) vector column.
It supports columns of the types NumericType
, BooleanType
, and VectorUDT
. Doubles are passed on untouched. Other numberic types and booleans are cast to doubles.
import org.apache.spark.ml.feature.VectorAssembler
val vecAssembler = new VectorAssembler()
scala> print(vecAssembler.explainParams)
inputCols: input column names (undefined)
outputCol: output column name (default: vecAssembler_5ac31099dbee__output)
final case class Record(id: Int, n1: Int, n2: Double, flag: Boolean)
val ds = Seq(Record(0, 4, 2.0, true)).toDS
scala> ds.printSchema
root
|-- id: integer (nullable = false)
|-- n1: integer (nullable = false)
|-- n2: double (nullable = false)
|-- flag: boolean (nullable = false)
val features = vecAssembler
.setInputCols(Array("n1", "n2", "flag"))
.setOutputCol("features")
.transform(ds)
scala> features.printSchema
root
|-- id: integer (nullable = false)
|-- n1: integer (nullable = false)
|-- n2: double (nullable = false)
|-- flag: boolean (nullable = false)
|-- features: vector (nullable = true)
scala> features.show
+---+---+---+----+-------------+
| id| n1| n2|flag| features|
+---+---+---+----+-------------+
| 0| 4|2.0|true|[4.0,2.0,1.0]|
+---+---+---+----+-------------+
UnaryTransformers
The UnaryTransformer abstract class is a specialized Transformer
that applies transformation to one input column and writes results to another (by appending a new column).
Each UnaryTransformer
defines the input and output columns using the following "chain" methods (they return the transformer on which they were executed and so are chainable):
-
setInputCol(value: String)
-
setOutputCol(value: String)
Each UnaryTransformer
calls validateInputType
while executing transformSchema(schema: StructType)
(that is part of PipelineStage contract).
Note
|
A UnaryTransformer is a PipelineStage .
|
When transform
is called, it first calls transformSchema
(with DEBUG logging enabled) and then adds the column as a result of calling a protected abstract createTransformFunc
.
Note
|
createTransformFunc function is abstract and defined by concrete UnaryTransformer objects.
|
Internally, transform
method uses Spark SQL’s udf to define a function (based on createTransformFunc
function described above) that will create the new output column (with appropriate outputDataType
). The UDF is later applied to the input column of the input DataFrame
and the result becomes the output column (using DataFrame.withColumn method).
Note
|
Using udf and withColumn methods from Spark SQL demonstrates an excellent integration between the Spark modules: MLlib and SQL.
|
The following are UnaryTransformer
implementations in spark.ml:
-
Tokenizer that converts a string column to lowercase and then splits it by white spaces.
-
RegexTokenizer that extracts tokens.
-
NGram that converts the input array of strings into an array of n-grams.
-
HashingTF that maps a sequence of terms to their term frequencies (cf. SPARK-13998 HashingTF should extend UnaryTransformer)
-
OneHotEncoder that maps a numeric input column of label indices onto a column of binary vectors.
RegexTokenizer
RegexTokenizer
is a UnaryTransformer that tokenizes a String
into a collection of String
.
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()
// dataset to transform with tabs and spaces
val df = Seq((0, s"""hello\tworld"""), (1, "two spaces inside")).toDF("label", "sentence")
val tokenized = regexTok.setInputCol("sentence").transform(df)
scala> tokenized.show(false)
+-----+------------------+-----------------------------+
|label|sentence |regexTok_810b87af9510__output|
+-----+------------------+-----------------------------+
|0 |hello world |[hello, world] |
|1 |two spaces inside|[two, spaces, inside] |
+-----+------------------+-----------------------------+
Note
|
Read the official scaladoc for org.apache.spark.ml.feature.RegexTokenizer. |
It supports minTokenLength
parameter that is the minimum token length that you can change using setMinTokenLength
method. It simply filters out smaller tokens and defaults to 1
.
// see above to set up the vals
scala> rt.setInputCol("line").setMinTokenLength(6).transform(df).show
+-----+--------------------+-----------------------------+
|label| line|regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
| 1| hello world| []|
| 2|yet another sentence| [another, sentence]|
+-----+--------------------+-----------------------------+
It has gaps
parameter that indicates whether regex splits on gaps (true
) or matches tokens (false
). You can set it using setGaps
. It defaults to true
.
When set to true
(i.e. splits on gaps) it uses Regex.split while Regex.findAllIn for false
.
scala> rt.setInputCol("line").setGaps(false).transform(df).show
+-----+--------------------+-----------------------------+
|label| line|regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
| 1| hello world| []|
| 2|yet another sentence| [another, sentence]|
+-----+--------------------+-----------------------------+
scala> rt.setInputCol("line").setGaps(false).setPattern("\\W").transform(df).show(false)
+-----+--------------------+-----------------------------+
|label|line |regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
|1 |hello world |[] |
|2 |yet another sentence|[another, sentence] |
+-----+--------------------+-----------------------------+
It has pattern
parameter that is the regex for tokenizing. It uses Scala’s .r method to convert the string to regex. Use setPattern
to set it. It defaults to \\s+
.
It has toLowercase
parameter that indicates whether to convert all characters to lowercase before tokenizing. Use setToLowercase
to change it. It defaults to true
.
NGram
In this example you use org.apache.spark.ml.feature.NGram that converts the input collection of strings into a collection of n-grams (of n
words).
import org.apache.spark.ml.feature.NGram
val bigram = new NGram("bigrams")
val df = Seq((0, Seq("hello", "world"))).toDF("id", "tokens")
bigram.setInputCol("tokens").transform(df).show
+---+--------------+---------------+
| id| tokens|bigrams__output|
+---+--------------+---------------+
| 0|[hello, world]| [hello world]|
+---+--------------+---------------+
HashingTF
Another example of a transformer is org.apache.spark.ml.feature.HashingTF that works on a Column
of ArrayType
.
It transforms the rows for the input column into a sparse term frequency vector.
import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("features")
.setNumFeatures(5000)
// see above for regexTok transformer
val regexedDF = regexTok.transform(df)
// Use HashingTF
val hashedDF = hashingTF.transform(regexedDF)
scala> hashedDF.show(false)
+---+------------------+---------------------+-----------------------------------+
|id |text |words |features |
+---+------------------+---------------------+-----------------------------------+
|0 |hello world |[hello, world] |(5000,[2322,3802],[1.0,1.0]) |
|1 |two spaces inside|[two, spaces, inside]|(5000,[276,940,2533],[1.0,1.0,1.0])|
+---+------------------+---------------------+-----------------------------------+
The name of the output column is optional, and if not specified, it becomes the identifier of a HashingTF
object with the __output
suffix.
scala> hashingTF.uid
res7: String = hashingTF_fe3554836819
scala> hashingTF.transform(regexDF).show(false)
+---+------------------+---------------------+-------------------------------------------+
|id |text |words |hashingTF_fe3554836819__output |
+---+------------------+---------------------+-------------------------------------------+
|0 |hello world |[hello, world] |(262144,[71890,72594],[1.0,1.0]) |
|1 |two spaces inside|[two, spaces, inside]|(262144,[53244,77869,115276],[1.0,1.0,1.0])|
+---+------------------+---------------------+-------------------------------------------+
OneHotEncoder
OneHotEncoder
is a Tokenizer
that maps a numeric input column of label indices onto a column of binary vectors.
// dataset to transform
val df = Seq(
(0, "a"), (1, "b"),
(2, "c"), (3, "a"),
(4, "a"), (5, "c"))
.toDF("label", "category")
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer().setInputCol("category").setOutputCol("cat_index").fit(df)
val indexed = indexer.transform(df)
import org.apache.spark.sql.types.NumericType
scala> indexed.schema("cat_index").dataType.isInstanceOf[NumericType]
res0: Boolean = true
import org.apache.spark.ml.feature.OneHotEncoder
val oneHot = new OneHotEncoder()
.setInputCol("cat_index")
.setOutputCol("cat_vec")
val oneHotted = oneHot.transform(indexed)
scala> oneHotted.show(false)
+-----+--------+---------+-------------+
|label|category|cat_index|cat_vec |
+-----+--------+---------+-------------+
|0 |a |0.0 |(2,[0],[1.0])|
|1 |b |2.0 |(2,[],[]) |
|2 |c |1.0 |(2,[1],[1.0])|
|3 |a |0.0 |(2,[0],[1.0])|
|4 |a |0.0 |(2,[0],[1.0])|
|5 |c |1.0 |(2,[1],[1.0])|
+-----+--------+---------+-------------+
scala> oneHotted.printSchema
root
|-- label: integer (nullable = false)
|-- category: string (nullable = true)
|-- cat_index: double (nullable = true)
|-- cat_vec: vector (nullable = true)
scala> oneHotted.schema("cat_vec").dataType.isInstanceOf[VectorUDT]
res1: Boolean = true
Custom UnaryTransformer
The following class is a custom UnaryTransformer
that transforms words using upper letters.
package pl.japila.spark
import org.apache.spark.ml._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types._
class UpperTransformer(override val uid: String)
extends UnaryTransformer[String, String, UpperTransformer] {
def this() = this(Identifiable.randomUID("upper"))
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}
protected def createTransformFunc: String => String = {
_.toUpperCase
}
protected def outputDataType: DataType = StringType
}
Given a DataFrame
you could use it as follows:
val upper = new UpperTransformer
scala> upper.setInputCol("text").transform(df).show
+---+-----+--------------------------+
| id| text|upper_0b559125fd61__output|
+---+-----+--------------------------+
| 0|hello| HELLO|
| 1|world| WORLD|
+---+-----+--------------------------+