// Cache Dataset -- it is lazy
scala> val df = spark.range(1).cache
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
// Trigger caching
scala> df.show
+---+
| id|
+---+
| 0|
+---+
// Visit http://localhost:4040/storage to see the Dataset cached. It should.
// You may also use queryExecution or explain to see InMemoryRelation
// InMemoryRelation is used for cached queries
scala> df.queryExecution.withCachedData
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))
// Use the cached Dataset in another query
// Notice InMemoryRelation in use for cached queries
scala> df.withColumn("newId", 'id).explain(extended = true)
== Parsed Logical Plan ==
'Project [*, 'id AS newId#16]
+- Range (0, 1, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint, newId: bigint
Project [id#0L, id#0L AS newId#16L]
+- Range (0, 1, step=1, splits=Some(8))
== Optimized Logical Plan ==
Project [id#0L, id#0L AS newId#16L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))
== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))
// Clear in-memory cache using SQL
// Equivalent to spark.catalog.clearCache
scala> sql("CLEAR CACHE").collect
res1: Array[org.apache.spark.sql.Row] = Array()
// Visit http://localhost:4040/storage to confirm the cleaning
Dataset Caching and Persistence
Operator | Description |
---|---|
Note
|
You can also use SQL’s
You could however use
Use SQL’s Use SQL’s Use SQL’s |
Note
|
Be careful what you cache, i.e. what Dataset is cached, as it gives different queries cached.
|
Tip
|
You can check whether a Dataset was cached or not using the following code:
|
SQL’s CACHE TABLE
SQL’s CACHE TABLE
corresponds to requesting the session-specific Catalog
to caching the table.
Internally, CACHE TABLE
becomes CacheTableCommand runnable command that…FIXME
Caching Dataset — cache
Method
cache(): this.type
cache
merely executes the no-argument persist method.
val ds = spark.range(5).cache
Persisting Dataset — persist
Method
persist(): this.type
persist(newLevel: StorageLevel): this.type
persist
caches the Dataset
using the default storage level MEMORY_AND_DISK
or newLevel
and returns it.
Internally, persist
requests CacheManager
to cache the query (that is accessible through SharedState of the current SparkSession).
Caution
|
FIXME |
Unpersisting Dataset — unpersist
Method
unpersist(blocking: Boolean): this.type
unpersist
uncache the Dataset
possibly by blocking
the call.
Internally, unpersist
requests CacheManager
to uncache the query.
Caution
|
FIXME |