transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]
Dataset Operators
You can group the set of all operators to use with Datasets
per their target, i.e. the part of a Dataset
they are applied to.
Beside the above operators, there are the following ones working with a Dataset
as a whole.
Operator | Description |
---|---|
Converting a |
|
Repartitioning a |
|
Counts the number of rows |
|
Explain logical and physical plans of a |
|
Internally, |
|
Internally, |
|
Randomly split a |
|
Reduces the elements of a Internally, |
|
Repartitioning a |
|
Converts a |
|
Transforms a |
|
Creates a streaming Used exclusively in Structured Streaming. |
|
count
Operator
Caution
|
FIXME |
toLocalIterator
Operator
Caution
|
FIXME |
createTempViewCommand
Internal Operator
Caution
|
FIXME |
createGlobalTempView
Operator
Caution
|
FIXME |
createOrReplaceTempView
Operator
Caution
|
FIXME |
createTempView
Operator
Caution
|
FIXME |
Transforming Datasets — transform
Operator
transform
applies t
function to the source Dataset[T]
to produce a result Dataset[U]
. It is for chaining custom transformations.
val dataset = spark.range(5)
// Transformation t
import org.apache.spark.sql.Dataset
def withDoubled(longs: Dataset[java.lang.Long]) = longs.withColumn("doubled", 'id * 2)
scala> dataset.transform(withDoubled).show
+---+-------+
| id|doubled|
+---+-------+
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
+---+-------+
Internally, transform
executes t
function on the current Dataset[T]
.
Converting "Typed" Dataset
to "Untyped" DataFrame
— toDF
Methods
toDF(): DataFrame
toDF(colNames: String*): DataFrame
Internally, the empty-argument toDF
creates a Dataset[Row]
using the Dataset
's SparkSession and QueryExecution with the encoder being RowEncoder.
Caution
|
FIXME Describe toDF(colNames: String*)
|
Converting to Dataset
— as
Method
Caution
|
FIXME |
Accessing DataFrameWriter
— write
Method
write: DataFrameWriter[T]
write
method returns DataFrameWriter for records of type T
.
import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS
val writer: DataFrameWriter[Int] = ints.write
Accessing DataStreamWriter
— writeStream
Method
writeStream: DataStreamWriter[T]
writeStream
method returns DataStreamWriter for records of type T
.
val papers = spark.readStream.text("papers").as[String]
import org.apache.spark.sql.streaming.DataStreamWriter
val writer: DataStreamWriter[String] = papers.writeStream
Display Records — show
Methods
show(): Unit
show(numRows: Int): Unit
show(truncate: Boolean): Unit
show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit
Caution
|
FIXME |
Internally, show
relays to a private showString
to do the formatting. It turns the Dataset
into a DataFrame
(by calling toDF()
) and takes first n
records.
Taking First n Records — take
Action
take(n: Int): Array[T]
take
is an action on a Dataset
that returns a collection of n
records.
Warning
|
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError .
|
foreachPartition
Action
foreachPartition(f: Iterator[T] => Unit): Unit
foreachPartition
applies the f
function to each partition of the Dataset
.
case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS
ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
Note
|
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable ) and ForeachSink.
|
mapPartitions
Operator
mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
mapPartitions
returns a new Dataset
(of type U
) with the function func
applied to each partition.
Caution
|
FIXME Example |
Creating Zero or More Records — flatMap
Operator
flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
flatMap
returns a new Dataset
(of type U
) with all records (of type T
) mapped over using the function func
and then flattening the results.
Note
|
flatMap can create new records. It deprecated explode .
|
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS
scala> sentences.flatMap(s => s.text.split("\\s+")).show
+-------+
| value|
+-------+
| hello|
| world|
| witaj|
|swiecie|
+-------+
Internally, flatMap
calls mapPartitions with the partitions flatMap(ped)
.
Repartitioning Dataset with Shuffle Disabled — coalesce
Operator
coalesce(numPartitions: Int): Dataset[T]
coalesce
operator repartitions the Dataset
to exactly numPartitions
partitions.
Internally, coalesce
creates a Repartition
logical operator with shuffle
disabled (which is marked as false
in the below explain
's output).
scala> spark.range(5).coalesce(1).explain(extended = true)
== Parsed Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))
== Optimized Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))
== Physical Plan ==
Coalesce 1
+- *Range (0, 5, step=1, splits=Some(8))
Repartitioning Dataset (Shuffle Enabled) — repartition
Operator
repartition(numPartitions: Int): Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
repartition(partitionExprs: Column*): Dataset[T]
repartition
operators repartition the Dataset
to exactly numPartitions
partitions or using partitionExprs
expressions.
Internally, repartition
creates a Repartition or RepartitionByExpression logical operators with shuffle
enabled (which is true
in the below explain
's output beside Repartition
).
scala> spark.range(5).repartition(1).explain(extended = true)
== Parsed Logical Plan ==
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))
== Optimized Logical Plan ==
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *Range (0, 5, step=1, splits=Some(8))
Note
|
repartition methods correspond to SQL’s DISTRIBUTE BY or CLUSTER BY clauses.
|
Projecting Columns — select
Operator
select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4],
c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
Caution
|
FIXME |
filter
Operator
Caution
|
FIXME |
where
Operator
where(condition: Column): Dataset[T]
where(conditionExpr: String): Dataset[T]
where
is a synonym for filter operator, i.e. it simply passes the parameters on to filter
.
Projecting Columns using Expressions — selectExpr
Operator
selectExpr(exprs: String*): DataFrame
selectExpr
is like select
, but accepts SQL expressions exprs
.
val ds = spark.range(5)
scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
+-------------------+
| random|
+-------------------+
| 0.887675894185651|
|0.36766085091074086|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|
+-------------------+
Internally, it executes select
with every expression in exprs
mapped to Column (using SparkSqlParser.parseExpression).
scala> ds.select(expr("rand() as random")).show
+------------------+
| random|
+------------------+
|0.5514319279894851|
|0.2876221510433741|
|0.4599999092045741|
|0.5708558868374893|
|0.6223314406247136|
+------------------+
Note
|
A new feature in Spark 2.0.0. |
Randomly Split Dataset — randomSplit
Operator
randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
randomSplit
randomly splits the Dataset
per weights
.
weights
doubles should sum up to 1
and will be normalized if they do not.
You can define seed
and if you don’t, a random seed
will be used.
Note
|
It is used in TrainValidationSplit to split dataset into training and validation datasets. |
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
+---+
| id|
+---+
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
Note
|
A new feature in Spark 2.0.0. |
Displaying Logical and Physical Plans, Their Cost and Codegen — explain
Operator
explain(): Unit
explain(extended: Boolean): Unit
explain
prints the logical and (with extended
flag enabled) physical plans, their cost and codegen to the console.
Tip
|
Use explain to review the structured queries and optimizations applied.
|
Internally, explain
creates a ExplainCommand logical command and requests SessionState
to execute it (to get a QueryExecution back).
Note
|
explain uses ExplainCommand logical command that, when executed, gives different text representations of QueryExecution (for the Dataset’s LogicalPlan) depending on the flags (e.g. extended, codegen, and cost which are disabled by default).
|
explain
then requests QueryExecution
for SparkPlan and collects the records (as InternalRow objects).
Note
|
|
In the end, explain
goes over the InternalRow
records and converts them to lines to display to console.
Note
|
explain "converts" an InternalRow record to a line using getString at position 0 .
|
Tip
|
If you are serious about query debugging you could also use the Debugging Query Execution facility. |
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Range (0, 10, step=1, splits=Some(8))
== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(8))
== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))
toJSON
Method
toJSON
maps the content of Dataset
to a Dataset
of JSON strings.
Note
|
A new feature in Spark 2.0.0. |
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.toJSON.show
+-------------------+
| value|
+-------------------+
| {"value":"hello"}|
| {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+
Internally, toJSON
grabs the RDD[InternalRow]
(of the QueryExecution of the Dataset
) and maps the records (per RDD partition) into JSON.
Note
|
toJSON uses Jackson’s JSON parser — jackson-module-scala.
|
Accessing Schema — schema
Method
A Dataset
has a schema.
schema: StructType
Tip
|
You may also use the following methods to learn about the schema:
|
Accessing Underlying RDD — rdd
Attribute
rdd: RDD[T]
Whenever you are in need to convert a Dataset
into a RDD
, executing rdd
method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset
.
scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30
Internally, it looks ExpressionEncoder (for the Dataset
) up and accesses the deserializer
expression. That gives the DataType of the result of evaluating the expression.
Note
|
A deserializer expression is used to decode an InternalRow to an object of type T . See ExpressionEncoder.
|
It then executes a DeserializeToObject
logical operator that will produce a RDD[InternalRow]
that is converted into the proper RDD[T]
using the DataType
and T
.
Note
|
It is a lazy operation that "produces" a RDD[T] .
|
Creating Streaming Dataset with EventTimeWatermark Logical Operator — withWatermark
Operator
withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
Internally, withWatermark
creates a Dataset
with EventTimeWatermark
logical plan for streaming Datasets.
Note
|
withWatermark uses EliminateEventTimeWatermark logical rule to eliminate EventTimeWatermark logical plan for non-streaming batch Datasets .
|
// Create a batch dataset
val events = spark.range(0, 50, 10).
withColumn("timestamp", from_unixtime(unix_timestamp - 'id)).
select('timestamp, 'id as "count")
scala> events.show
+-------------------+-----+
| timestamp|count|
+-------------------+-----+
|2017-06-25 21:21:14| 0|
|2017-06-25 21:21:04| 10|
|2017-06-25 21:20:54| 20|
|2017-06-25 21:20:44| 30|
|2017-06-25 21:20:34| 40|
+-------------------+-----+
// the dataset is a non-streaming batch one...
scala> events.isStreaming
res1: Boolean = false
// ...so EventTimeWatermark is not included in the logical plan
val watermarked = events.
withWatermark(eventTime = "timestamp", delayThreshold = "20 seconds")
scala> println(watermarked.queryExecution.logical.numberedTreeString)
00 Project [timestamp#284, id#281L AS count#288L]
01 +- Project [id#281L, from_unixtime((unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, Some(America/Chicago)) - id#281L), yyyy-MM-dd HH:mm:ss, Some(America/Chicago)) AS timestamp#284]
02 +- Range (0, 50, step=10, splits=Some(8))
// Let's create a streaming Dataset
import org.apache.spark.sql.types.StructType
val schema = new StructType().
add($"timestamp".timestamp).
add($"count".long)
scala> schema.printTreeString
root
|-- timestamp: timestamp (nullable = true)
|-- count: long (nullable = true)
val events = spark.
readStream.
schema(schema).
csv("events").
withWatermark(eventTime = "timestamp", delayThreshold = "20 seconds")
scala> println(events.queryExecution.logical.numberedTreeString)
00 'EventTimeWatermark 'timestamp, interval 20 seconds
01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@75abcdd4,csv,List(),Some(StructType(StructField(timestamp,TimestampType,true), StructField(count,LongType,true))),List(),None,Map(path -> events),None), FileSource[events], [timestamp#329, count#330L]
Note
|
|
Note
|
delayThreshold must not be negative (and milliseconds and months should both be equal or greater than 0 ).
|
Note
|
withWatermark is used when…FIXME
|