ExpressionEncoder — Expression-Based Encoder

ExpressionEncoder[T] is a generic Encoder of JVM objects of the type T to internal binary row format (as InternalRow).

ExpressionEncoder[T] uses Catalyst expressions for a serializer and a deserializer.

Note
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T]).
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
scala> val row = stringEncoder.toRow("hello world")
row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72]

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
scala> val unsafeRow = row match { case ur: UnsafeRow => ur }
unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72]

ExpressionEncoder uses serializer expressions to encode (aka serialize) a JVM object of type T to an internal binary row format (i.e. InternalRow).

Note
It is assumed that all serializer expressions contain at least one and the same BoundReference.

ExpressionEncoder uses a deserializer expression to decode (aka deserialize) a JVM object of type T from internal binary row format.

ExpressionEncoder is flat when serializer uses a single expression (which also means that the objects of a type T are not created using constructor parameters only like Product or DefinedByConstructorParams types).

Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.

Table 1. ExpressionEncoder’s (Lazily-Initialized) Internal Properties
Property Description

constructProjection

Projection generated for the deserializer expression

Used exclusively when ExpressionEncoder is requested for a JVM object from a Spark SQL row (i.e. InternalRow).

extractProjection

UnsafeProjection generated for the serializer expressions

Used exclusively when ExpressionEncoder is requested for an encoded version of a JVM object as a Spark SQL row (i.e. InternalRow).

inputRow

GenericInternalRow (with the underlying storage array) of size 1 (i.e. it can only store a single JVM object of any type).

Used…​FIXME

Note
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean, long, String, java.sql.Date, java.sql.Timestamp, Array[Byte].

resolveAndBind Method

Caution
FIXME

Creating ExpressionEncoder Instance

ExpressionEncoder takes the following when created:

Creating Deserialize Expression — ScalaReflection.deserializerFor Method

deserializerFor[T: TypeTag]: Expression

deserializerFor creates an expression to deserialize from internal binary row format to a Scala object of type T.

import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor
val timestampDeExpr = deserializerFor[java.sql.Timestamp]
scala> println(timestampDeExpr.numberedTreeString)
00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true)
01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp")
02    +- getcolumnbyordinal(0, TimestampType)

val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)]
scala> println(tuple2DeExpr.numberedTreeString)
00 newInstance(class scala.Tuple2)
01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true)
02 :  +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2")
03 :     +- getcolumnbyordinal(0, TimestampType)
04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2")
05    +- getcolumnbyordinal(1, DoubleType)

Internally, deserializerFor calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"

Tip
Read up on Scala’s TypeTags in TypeTags and Manifests.
Note
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.

Recursive Internal deserializerFor Method

deserializerFor(
  tpe: `Type`,
  path: Option[Expression],
  walkedTypePath: Seq[String]): Expression
Table 2. JVM Types and Deserialize Expressions (in evaluation order)
JVM Type (Scala or Java) Deserialize Expressions

Option[T]

java.lang.Integer

java.lang.Long

java.lang.Double

java.lang.Float

java.lang.Short

java.lang.Byte

java.lang.Boolean

java.sql.Date

java.sql.Timestamp

java.lang.String

java.math.BigDecimal

scala.BigDecimal

java.math.BigInteger

scala.math.BigInt

Array[T]

Seq[T]

Map[K, V]

SQLUserDefinedType

User Defined Types (UDTs)

Product (including Tuple) or DefinedByConstructorParams

Creating Serialize Expression — ScalaReflection.serializerFor Method

serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct

serializerFor creates a CreateNamedStruct expression to serialize a Scala object of type T to internal binary row format.

import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor

import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types.TimestampType
val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true)

val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef)
scala> println(timestampSerExpr.numberedTreeString)
00 named_struct(value, input[0, timestamp, true])
01 :- value
02 +- input[0, timestamp, true]

Internally, serializerFor calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]" and pattern match on the result expression.

Caution
FIXME the pattern match part
Tip
Read up on Scala’s TypeTags in TypeTags and Manifests.
Note
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.

Recursive Internal serializerFor Method

serializerFor(
  inputObject: Expression,
  tpe: `Type`,
  walkedTypePath: Seq[String],
  seenTypeSet: Set[`Type`] = Set.empty): Expression

serializerFor creates an expression for serializing an object of type T to an internal row.

Caution
FIXME

Encoding JVM Object to Internal Binary Row Format — toRow Method

toRow(t: T): InternalRow

toRow encodes (aka serializes) a JVM object t as an internal binary row.

Internally, toRow sets the only JVM object to be t in inputRow and converts the inputRow to a unsafe binary row (using extractProjection).

In case of any exception while serializing, toRow reports a RuntimeException:

Error while encoding: [initial exception]
[multi-line serializer]
Note

toRow is mostly used when SparkSession is requested for:

Decoding JVM Object From Internal Binary Row Format — fromRow Method

fromRow(row: InternalRow): T

fromRow decodes (aka deserializes) a JVM object from a row InternalRow (with the required values only).

Internally, fromRow uses constructProjection with row and gets the 0th element of type ObjectType that is then cast to the output type T.

In case of any exception while deserializing, fromRow reports a RuntimeException:

Error while decoding: [initial exception]
[deserializer]
Note

fromRow is used for:

  • Dataset operators, i.e. head, collect, collectAsList, toLocalIterator

  • Structured Streaming’s ForeachSink

results matching ""

    No results matching ""