ML Pipeline Components — Transformers

A transformer is a function object that maps (aka transforms) a DataFrame into another DataFrame (both called datasets).

transformer: DataFrame =[transform]=> DataFrame

Transformers prepare a dataset for an machine learning algorithm to work with. They are also very helpful to transform DataFrames in general (even outside the machine learning space).

Transformers are instances of org.apache.spark.ml.Transformer abstract class that offers transform family of methods:

transform(dataset: DataFrame): DataFrame
transform(dataset: DataFrame, paramMap: ParamMap): DataFrame
transform(dataset: DataFrame, firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame

A Transformer is a PipelineStage and thus can be a part of a Pipeline.

A few available implementations of Transformer:

See Custom UnaryTransformer section for a custom Transformer implementation.

StopWordsRemover

StopWordsRemover is a machine learning feature transformer that takes a string array column and outputs a string array column with all defined stop words removed. The transformer comes with a standard set of English stop words as default (that are the same as scikit-learn uses, i.e. from the Glasgow Information Retrieval Group).

StopWordsRemover class belongs to org.apache.spark.ml.feature package.

import org.apache.spark.ml.feature.StopWordsRemover
val stopWords = new StopWordsRemover

It accepts the following parameters:

scala> println(stopWords.explainParams)
caseSensitive: whether to do case-sensitive comparison during filtering (default: false)
inputCol: input column name (undefined)
outputCol: output column name (default: stopWords_9c2c0fdd8a68__output)
stopWords: stop words (default: [Ljava.lang.String;@5dabe7c8)
Note
null values from the input array are preserved unless adding null to stopWords explicitly.
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer("regexTok")
  .setInputCol("text")
  .setPattern("\\W+")

import org.apache.spark.ml.feature.StopWordsRemover
val stopWords = new StopWordsRemover("stopWords")
  .setInputCol(regexTok.getOutputCol)

val df = Seq("please find it done (and empty)", "About to be rich!", "empty")
  .zipWithIndex
  .toDF("text", "id")

scala> stopWords.transform(regexTok.transform(df)).show(false)
+-------------------------------+---+------------------------------------+-----------------+
|text                           |id |regexTok__output                    |stopWords__output|
+-------------------------------+---+------------------------------------+-----------------+
|please find it done (and empty)|0  |[please, find, it, done, and, empty]|[]               |
|About to be rich!              |1  |[about, to, be, rich]               |[rich]           |
|empty                          |2  |[empty]                             |[]               |
+-------------------------------+---+------------------------------------+-----------------+

Binarizer

Binarizer is a Transformer that splits the values in the input column into two groups - "ones" for values larger than the threshold and "zeros" for the others.

It works with DataFrames with the input column of DoubleType or VectorUDT. The type of the result output column matches the type of the input column, i.e. DoubleType or VectorUDT.

import org.apache.spark.ml.feature.Binarizer
val bin = new Binarizer()
  .setInputCol("rating")
  .setOutputCol("label")
  .setThreshold(3.5)

scala> println(bin.explainParams)
inputCol: input column name (current: rating)
outputCol: output column name (default: binarizer_dd9710e2a831__output, current: label)
threshold: threshold used to binarize continuous features (default: 0.0, current: 3.5)

val doubles = Seq((0, 1d), (1, 1d), (2, 5d)).toDF("id", "rating")

scala> bin.transform(doubles).show
+---+------+-----+
| id|rating|label|
+---+------+-----+
|  0|   1.0|  0.0|
|  1|   1.0|  0.0|
|  2|   5.0|  1.0|
+---+------+-----+

import org.apache.spark.mllib.linalg.Vectors
val denseVec = Vectors.dense(Array(4.0, 0.4, 3.7, 1.5))
val vectors = Seq((0, denseVec)).toDF("id", "rating")

scala> bin.transform(vectors).show
+---+-----------------+-----------------+
| id|           rating|            label|
+---+-----------------+-----------------+
|  0|[4.0,0.4,3.7,1.5]|[1.0,0.0,1.0,0.0]|
+---+-----------------+-----------------+

SQLTransformer

SQLTransformer is a Transformer that does transformations by executing SELECT …​ FROM THIS with THIS being the underlying temporary table registered for the input dataset.

Note
It has been available since Spark 1.6.0.

It requires that the SELECT query uses THIS that corresponds to a temporary table and simply executes the mandatory statement using sql method.

You have to specify the mandatory statement parameter using setStatement method.

import org.apache.spark.ml.feature.SQLTransformer
val sql = new SQLTransformer()

// dataset to work with
val df = Seq((0, s"""hello\tworld"""), (1, "two  spaces inside")).toDF("label", "sentence")

scala> sql.setStatement("SELECT sentence FROM __THIS__ WHERE label = 0").transform(df).show
+-----------+
|   sentence|
+-----------+
|hello	world|
+-----------+

scala> println(sql.explainParams)
statement: SQL statement (current: SELECT sentence FROM __THIS__ WHERE label = 0)

VectorAssembler

VectorAssembler is a feature transformer that assembles (merges) multiple columns into a (feature) vector column.

It supports columns of the types NumericType, BooleanType, and VectorUDT. Doubles are passed on untouched. Other numberic types and booleans are cast to doubles.

import org.apache.spark.ml.feature.VectorAssembler
val vecAssembler = new VectorAssembler()

scala> print(vecAssembler.explainParams)
inputCols: input column names (undefined)
outputCol: output column name (default: vecAssembler_5ac31099dbee__output)

final case class Record(id: Int, n1: Int, n2: Double, flag: Boolean)
val ds = Seq(Record(0, 4, 2.0, true)).toDS

scala> ds.printSchema
root
 |-- id: integer (nullable = false)
 |-- n1: integer (nullable = false)
 |-- n2: double (nullable = false)
 |-- flag: boolean (nullable = false)

val features = vecAssembler
  .setInputCols(Array("n1", "n2", "flag"))
  .setOutputCol("features")
  .transform(ds)

scala> features.printSchema
root
 |-- id: integer (nullable = false)
 |-- n1: integer (nullable = false)
 |-- n2: double (nullable = false)
 |-- flag: boolean (nullable = false)
 |-- features: vector (nullable = true)


scala> features.show
+---+---+---+----+-------------+
| id| n1| n2|flag|     features|
+---+---+---+----+-------------+
|  0|  4|2.0|true|[4.0,2.0,1.0]|
+---+---+---+----+-------------+

UnaryTransformers

The UnaryTransformer abstract class is a specialized Transformer that applies transformation to one input column and writes results to another (by appending a new column).

Each UnaryTransformer defines the input and output columns using the following "chain" methods (they return the transformer on which they were executed and so are chainable):

  • setInputCol(value: String)

  • setOutputCol(value: String)

Each UnaryTransformer calls validateInputType while executing transformSchema(schema: StructType) (that is part of PipelineStage contract).

Note
A UnaryTransformer is a PipelineStage.

When transform is called, it first calls transformSchema (with DEBUG logging enabled) and then adds the column as a result of calling a protected abstract createTransformFunc.

Note
createTransformFunc function is abstract and defined by concrete UnaryTransformer objects.

Internally, transform method uses Spark SQL’s udf to define a function (based on createTransformFunc function described above) that will create the new output column (with appropriate outputDataType). The UDF is later applied to the input column of the input DataFrame and the result becomes the output column (using DataFrame.withColumn method).

Note
Using udf and withColumn methods from Spark SQL demonstrates an excellent integration between the Spark modules: MLlib and SQL.

The following are UnaryTransformer implementations in spark.ml:

RegexTokenizer

RegexTokenizer is a UnaryTransformer that tokenizes a String into a collection of String.

import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()

// dataset to transform with tabs and spaces
val df = Seq((0, s"""hello\tworld"""), (1, "two  spaces inside")).toDF("label", "sentence")

val tokenized = regexTok.setInputCol("sentence").transform(df)

scala> tokenized.show(false)
+-----+------------------+-----------------------------+
|label|sentence          |regexTok_810b87af9510__output|
+-----+------------------+-----------------------------+
|0    |hello	world       |[hello, world]               |
|1    |two  spaces inside|[two, spaces, inside]        |
+-----+------------------+-----------------------------+
Note
Read the official scaladoc for org.apache.spark.ml.feature.RegexTokenizer.

It supports minTokenLength parameter that is the minimum token length that you can change using setMinTokenLength method. It simply filters out smaller tokens and defaults to 1.

// see above to set up the vals

scala> rt.setInputCol("line").setMinTokenLength(6).transform(df).show
+-----+--------------------+-----------------------------+
|label|                line|regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
|    1|         hello world|                           []|
|    2|yet another sentence|          [another, sentence]|
+-----+--------------------+-----------------------------+

It has gaps parameter that indicates whether regex splits on gaps (true) or matches tokens (false). You can set it using setGaps. It defaults to true.

When set to true (i.e. splits on gaps) it uses Regex.split while Regex.findAllIn for false.

scala> rt.setInputCol("line").setGaps(false).transform(df).show
+-----+--------------------+-----------------------------+
|label|                line|regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
|    1|         hello world|                           []|
|    2|yet another sentence|          [another, sentence]|
+-----+--------------------+-----------------------------+

scala> rt.setInputCol("line").setGaps(false).setPattern("\\W").transform(df).show(false)
+-----+--------------------+-----------------------------+
|label|line                |regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
|1    |hello world         |[]                           |
|2    |yet another sentence|[another, sentence]          |
+-----+--------------------+-----------------------------+

It has pattern parameter that is the regex for tokenizing. It uses Scala’s .r method to convert the string to regex. Use setPattern to set it. It defaults to \\s+.

It has toLowercase parameter that indicates whether to convert all characters to lowercase before tokenizing. Use setToLowercase to change it. It defaults to true.

NGram

In this example you use org.apache.spark.ml.feature.NGram that converts the input collection of strings into a collection of n-grams (of n words).

import org.apache.spark.ml.feature.NGram

val bigram = new NGram("bigrams")
val df = Seq((0, Seq("hello", "world"))).toDF("id", "tokens")
bigram.setInputCol("tokens").transform(df).show

+---+--------------+---------------+
| id|        tokens|bigrams__output|
+---+--------------+---------------+
|  0|[hello, world]|  [hello world]|
+---+--------------+---------------+

HashingTF

Another example of a transformer is org.apache.spark.ml.feature.HashingTF that works on a Column of ArrayType.

It transforms the rows for the input column into a sparse term frequency vector.

import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("features")
  .setNumFeatures(5000)

// see above for regexTok transformer
val regexedDF = regexTok.transform(df)

// Use HashingTF
val hashedDF = hashingTF.transform(regexedDF)

scala> hashedDF.show(false)
+---+------------------+---------------------+-----------------------------------+
|id |text              |words                |features                           |
+---+------------------+---------------------+-----------------------------------+
|0  |hello	world       |[hello, world]       |(5000,[2322,3802],[1.0,1.0])       |
|1  |two  spaces inside|[two, spaces, inside]|(5000,[276,940,2533],[1.0,1.0,1.0])|
+---+------------------+---------------------+-----------------------------------+

The name of the output column is optional, and if not specified, it becomes the identifier of a HashingTF object with the __output suffix.

scala> hashingTF.uid
res7: String = hashingTF_fe3554836819

scala> hashingTF.transform(regexDF).show(false)
+---+------------------+---------------------+-------------------------------------------+
|id |text              |words                |hashingTF_fe3554836819__output             |
+---+------------------+---------------------+-------------------------------------------+
|0  |hello	world       |[hello, world]       |(262144,[71890,72594],[1.0,1.0])           |
|1  |two  spaces inside|[two, spaces, inside]|(262144,[53244,77869,115276],[1.0,1.0,1.0])|
+---+------------------+---------------------+-------------------------------------------+

OneHotEncoder

OneHotEncoder is a Tokenizer that maps a numeric input column of label indices onto a column of binary vectors.

// dataset to transform
val df = Seq(
  (0, "a"), (1, "b"),
  (2, "c"), (3, "a"),
  (4, "a"), (5, "c"))
  .toDF("label", "category")
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer().setInputCol("category").setOutputCol("cat_index").fit(df)
val indexed = indexer.transform(df)

import org.apache.spark.sql.types.NumericType

scala> indexed.schema("cat_index").dataType.isInstanceOf[NumericType]
res0: Boolean = true

import org.apache.spark.ml.feature.OneHotEncoder
val oneHot = new OneHotEncoder()
  .setInputCol("cat_index")
  .setOutputCol("cat_vec")

val oneHotted = oneHot.transform(indexed)

scala> oneHotted.show(false)
+-----+--------+---------+-------------+
|label|category|cat_index|cat_vec      |
+-----+--------+---------+-------------+
|0    |a       |0.0      |(2,[0],[1.0])|
|1    |b       |2.0      |(2,[],[])    |
|2    |c       |1.0      |(2,[1],[1.0])|
|3    |a       |0.0      |(2,[0],[1.0])|
|4    |a       |0.0      |(2,[0],[1.0])|
|5    |c       |1.0      |(2,[1],[1.0])|
+-----+--------+---------+-------------+

scala> oneHotted.printSchema
root
 |-- label: integer (nullable = false)
 |-- category: string (nullable = true)
 |-- cat_index: double (nullable = true)
 |-- cat_vec: vector (nullable = true)

scala> oneHotted.schema("cat_vec").dataType.isInstanceOf[VectorUDT]
res1: Boolean = true

Custom UnaryTransformer

The following class is a custom UnaryTransformer that transforms words using upper letters.

package pl.japila.spark

import org.apache.spark.ml._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types._

class UpperTransformer(override val uid: String)
    extends UnaryTransformer[String, String, UpperTransformer] {

  def this() = this(Identifiable.randomUID("upper"))

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType)
  }

  protected def createTransformFunc: String => String = {
    _.toUpperCase
  }

  protected def outputDataType: DataType = StringType
}

Given a DataFrame you could use it as follows:

val upper = new UpperTransformer

scala> upper.setInputCol("text").transform(df).show
+---+-----+--------------------------+
| id| text|upper_0b559125fd61__output|
+---+-----+--------------------------+
|  0|hello|                     HELLO|
|  1|world|                     WORLD|
+---+-----+--------------------------+

results matching ""

    No results matching ""