LocalDateTimeEncoder — Custom ExpressionEncoder for java.time.LocalDateTime

Spark SQL does not support java.time.LocalDateTime values in a Dataset.

import java.time.LocalDateTime

scala> val times = Seq(LocalDateTime.now).toDF("time")
<console>:24: error: value toDF is not a member of Seq[java.time.LocalDateTime]
       val times = Seq(LocalDateTime.now).toDF("time")
                                          ^

The reason for the error is that there is no encoder for java.time.LocalDateTime.

import java.time.LocalDateTime
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

implicit def scalaLocalDateTime: Encoder[java.time.LocalDateTime] = ExpressionEncoder()

scala> val times = Seq(LocalDateTime.now).toDF("time")
java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDateTime
- root class: "java.time.LocalDateTime"
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at scalaLocalDateTime(<console>:27)
  ... 48 elided

LocalDateTimeEncoder is an attempt to develop a custom ExpressionEncoder for Java’s java.time.LocalDateTime.

public final class LocalDateTime

A date-time without a time-zone in the ISO-8601 calendar system, such as 2007-12-03T10:15:30.

LocalDateTime is an immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second.

// $ SPARK_SUBMIT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" ./bin/spark-shell --conf spark.rpc.askTimeout=5m

import java.time.LocalDateTime
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

import org.apache.spark.sql.types._
val schema = StructType(
  $"year".int :: $"month".int :: $"day".int :: Nil)
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.expressions.BoundReference
val inputObject = BoundReference(0, StringType, nullable = true)
import org.apache.spark.sql.types.TimestampType
val staticInvoke = StaticInvoke(
  classOf[java.time.LocalDateTime],
  TimestampType,
  "parse",
  inputObject :: Nil))
val serializer: Seq[Expression] = Seq(

val deserializer: Expression =
  StaticInvoke(
    DateTimeUtils.getClass,
    ObjectType(classOf[java.time.LocalDateTime]),
    "toJavaTimestamp",
    getPath :: Nil)
import scala.reflect._
implicit def scalaLocalDateTime: Encoder[java.time.LocalDateTime] =
  new ExpressionEncoder[java.time.LocalDateTime](
    schema,
    flat = true,  // what would happen with false?
    serializer,
    deserializer,
    classTag[java.time.LocalDateTime])

val times = Seq(LocalDateTime.now).toDF("time")

Open Questions

  1. ScalaReflection.serializerFor passes ObjectType objects through

  2. ScalaReflection.serializerFor uses StaticInvoke for java.sql.Timestamp and java.sql.Date.

    case t if t <:< localTypeOf[java.sql.Timestamp] =>
      StaticInvoke(
        DateTimeUtils.getClass,
        TimestampType,
        "fromJavaTimestamp",
        inputObject :: Nil)
    
    case t if t <:< localTypeOf[java.sql.Date] =>
      StaticInvoke(
        DateTimeUtils.getClass,
        DateType,
        "fromJavaDate",
        inputObject :: Nil)
  3. How could SQLUserDefinedType and UDTRegistration help here?

results matching ""

    No results matching ""