import java.time.LocalDateTime
scala> val times = Seq(LocalDateTime.now).toDF("time")
<console>:24: error: value toDF is not a member of Seq[java.time.LocalDateTime]
val times = Seq(LocalDateTime.now).toDF("time")
^
LocalDateTimeEncoder — Custom ExpressionEncoder for java.time.LocalDateTime
Spark SQL does not support java.time.LocalDateTime
values in a Dataset
.
The reason for the error is that there is no encoder for java.time.LocalDateTime
.
import java.time.LocalDateTime
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
implicit def scalaLocalDateTime: Encoder[java.time.LocalDateTime] = ExpressionEncoder()
scala> val times = Seq(LocalDateTime.now).toDF("time")
java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDateTime
- root class: "java.time.LocalDateTime"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at scalaLocalDateTime(<console>:27)
... 48 elided
LocalDateTimeEncoder
is an attempt to develop a custom ExpressionEncoder for Java’s java.time.LocalDateTime.
public final class LocalDateTime
A date-time without a time-zone in the ISO-8601 calendar system, such as 2007-12-03T10:15:30
.
LocalDateTime
is an immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second.
// $ SPARK_SUBMIT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" ./bin/spark-shell --conf spark.rpc.askTimeout=5m
import java.time.LocalDateTime
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
val schema = StructType(
$"year".int :: $"month".int :: $"day".int :: Nil)
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.expressions.BoundReference
val inputObject = BoundReference(0, StringType, nullable = true)
import org.apache.spark.sql.types.TimestampType
val staticInvoke = StaticInvoke(
classOf[java.time.LocalDateTime],
TimestampType,
"parse",
inputObject :: Nil))
val serializer: Seq[Expression] = Seq(
val deserializer: Expression =
StaticInvoke(
DateTimeUtils.getClass,
ObjectType(classOf[java.time.LocalDateTime]),
"toJavaTimestamp",
getPath :: Nil)
import scala.reflect._
implicit def scalaLocalDateTime: Encoder[java.time.LocalDateTime] =
new ExpressionEncoder[java.time.LocalDateTime](
schema,
flat = true, // what would happen with false?
serializer,
deserializer,
classTag[java.time.LocalDateTime])
val times = Seq(LocalDateTime.now).toDF("time")
Open Questions
-
ScalaReflection.serializerFor
passesObjectType
objects through -
ScalaReflection.serializerFor
usesStaticInvoke
forjava.sql.Timestamp
andjava.sql.Date
.case t if t <:< localTypeOf[java.sql.Timestamp] => StaticInvoke( DateTimeUtils.getClass, TimestampType, "fromJavaTimestamp", inputObject :: Nil) case t if t <:< localTypeOf[java.sql.Date] => StaticInvoke( DateTimeUtils.getClass, DateType, "fromJavaDate", inputObject :: Nil)
-
How could
SQLUserDefinedType
andUDTRegistration
help here?