SparkSession — The Entry Point to Spark SQL

SparkSession is the entry point to Spark SQL. It is the very first object you have to create while developing Spark SQL applications using the fully-typed Dataset (or untyped Row-based DataFrame) data abstractions.

Note
SparkSession has merged SQLContext and HiveContext in one object in Spark 2.0.

You use the SparkSession.builder method to create an instance of SparkSession.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // avoid hardcoding the deployment environment
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir", "target/spark-warehouse")
  .getOrCreate

And stop the current SparkSession using stop method.

spark.stop

You can have as many SparkSessions as you want in a single Spark application. The common use case is to keep relational entities separate per SparkSession (see catalog attribute).

scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default|       null|  MANAGED|      false|
|              strs|    null|       null|TEMPORARY|       true|
+------------------+--------+-----------+---------+-----------+

Internally, SparkSession requires a SparkContext and an optional SharedState (that represents the shared state across SparkSession instances).

Table 1. SparkSession’s Class and Instance Methods
Method Description

builder

"Opens" a builder to get or create a SparkSession instance

version

Returns the current version of Spark.

implicits

Use import spark.implicits._ to import the implicits conversions and create Datasets from (almost arbitrary) Scala objects.

emptyDataset[T]

Creates an empty Dataset[T].

range

Creates a Dataset[Long].

sql

Executes a SQL query (and returns a DataFrame).

udf

Access to user-defined functions (UDFs).

table

Creates a DataFrame from a table.

catalog

Access to the catalog of the entities of structured queries

read

Access to DataFrameReader to read a DataFrame from external files and storage systems.

conf

Access to the current runtime configuration.

readStream

Access to DataStreamReader to read streaming datasets.

streams

Access to StreamingQueryManager to manage structured streaming queries.

newSession

Creates a new SparkSession.

stop

Stops the SparkSession.

Tip

Use spark.sql.warehouse.dir Spark property to change the location of Hive’s hive.metastore.warehouse.dir property, i.e. the location of the Hive local/embedded metastore database (using Derby).

Refer to SharedState to learn about (the low-level details of) Spark SQL support for Apache Hive.

See also the official Hive Metastore Administration document.

Table 2. SparkSession’s (Lazily-Initialized) Attributes (in alphabetical order)
Name Type Description

sessionState

SessionState

Internally, sessionState clones the optional parent SessionState (if given when creating SparkSession) or creates a new SessionState using BaseSessionStateBuilder as defined by spark.sql.catalogImplementation property:

  • in-memory (default) for org.apache.spark.sql.internal.SessionStateBuilder

  • hive for org.apache.spark.sql.hive.HiveSessionStateBuilder

sharedState

SharedState

Note
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.

Creating SparkSession Instance

Caution
FIXME

Creating SparkSession Using Builder Pattern — builder Method

builder(): Builder

builder creates a new Builder that you use to build a fully-configured SparkSession using a fluent API.

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.

Accessing Version of Spark — version Method

version: String

version returns the version of Apache Spark in use.

Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.

Implicit Conversions — implicits object

The implicits object is a helper class with the Scala implicit methods (aka conversions) to convert Scala objects to Datasets, DataFrames and Columns. It also defines Encoders for Scala’s "primitive" types, e.g. Int, Double, String, and their products and collections.

Note

Import the implicits by import spark.implicits._.

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

implicits object offers support for creating Dataset from RDD of any type (for which an encoder exists in scope), or case classes or tuples, and Seq.

implicits object also offers conversions from Scala’s Symbol or $ to Column.

It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.

Note
It is only possible to call toDF methods on RDD objects of Int, Long, and String "primitive" types.

Creating Empty Dataset — emptyDataset method

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset creates an empty Dataset (assuming that future records being of type T).

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

emptyDataset creates a LocalRelation logical query plan.

Creating Dataset from Local Collections and RDDs — createDataset methods

createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]

createDataset is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T], Java’s List[T], or a distributed RDD[T].

scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> one.show
+-----+
|value|
+-----+
|    1|
+-----+

createDataset creates a LocalRelation logical query plan (for the input data collection) or LogicalRDD (for the input RDD[T]).

Tip

You’d be better off using Scala implicits and toDS method instead (that does this conversion automatically for you).

val spark: SparkSession = ...
import spark.implicits._

scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]

Internally, createDataset first looks up the implicit expression encoder in scope to access the AttributeReferences (of the schema).

Note
Only unresolved expression encoders are currently supported.

The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset with a LocalRelation logical query plan.

Creating Dataset With Single Long Column — range methods

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

range family of methods create a Dataset of Long numbers.

scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
|  0|
|  2|
+---+
Note
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.

Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.

Creating Empty DataFrame —  emptyDataFrame method

emptyDataFrame: DataFrame

emptyDataFrame creates an empty DataFrame (with no rows and columns).

It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).

Creating DataFrames from RDDs with Explicit Schema — createDataFrame method

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.

Executing SQL Queries (aka SQL Mode) — sql Method

sql(sqlText: String): DataFrame

sql executes the sqlText SQL statement and creates a DataFrame.

Note

sql is imported in spark-shell so you can execute SQL statements as if sql were a part of the environment.

scala> spark.version
res0: String = 2.2.0-SNAPSHOT

scala> :imports
 1) import spark.implicits._       (72 terms, 43 are implicit)
 2) import spark.sql               (1 terms)
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]

scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []

// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")

scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata|      false|
+---------+-----------+

Internally, sql requests the current ParserInterface to execute a SQL query that gives a LogicalPlan.

Note
sql uses SessionState to access the current ParserInterface.

sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.

Tip

spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).

spark-sql> show databases;
default
Time taken: 0.028 seconds, Fetched 1 row(s)

Accessing UDF Registration Interface — udf Attribute

udf: UDFRegistration

udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.

val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)

val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")

scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
|    a|    A|
|    b|    B|
|    c|    C|
+-----+-----+

Internally, it is simply an alias for SessionState.udfRegistration.

Creating DataFrame for Table — table method

table(tableName: String): DataFrame

table creates a DataFrame from records in the tableName table (if exists).

val df = spark.table("mytable")

Accessing Metastore — catalog Attribute

catalog: Catalog

catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and temporary views).

Tip
All methods in Catalog return Datasets.
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default|       null|  MANAGED|      false|
|              strs|    null|       null|TEMPORARY|       true|
+------------------+--------+-----------+---------+-----------+

Internally, catalog creates a CatalogImpl (that uses the current SparkSession).

Accessing DataFrameReader — read method

read: DataFrameReader

read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read

Runtime Configuration — conf attribute

conf: RuntimeConfig

conf returns the current runtime configuration (as RuntimeConfig) that wraps SQLConf.

Caution
FIXME

readStream method

readStream: DataStreamReader

readStream returns a new DataStreamReader.

streams Attribute

streams: StreamingQueryManager

streams attribute gives access to StreamingQueryManager (through SessionState).

val spark: SparkSession = ...
spark.streams.active.foreach(println)

streamingQueryManager Attribute

streamingQueryManager is…​

listenerManager Attribute

listenerManager is…​

ExecutionListenerManager

ExecutionListenerManager is…​

functionRegistry Attribute

functionRegistry is…​

experimentalMethods Attribute

experimental: ExperimentalMethods

experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.

Note
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.

newSession method

newSession(): SparkSession

newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).

scala> println(sc.version)
2.0.0-SNAPSHOT

scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a

Stopping SparkSession — stop Method

stop(): Unit

stop stops the SparkSession, i.e. stops the underlying SparkContext.

Create DataFrame from BaseRelation — baseRelationToDataFrame Method

baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame

Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.

Note
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
Note

baseRelationToDataFrame is used when:

Building SessionState — instantiateSessionState Internal Method

instantiateSessionState(className: String, sparkSession: SparkSession): SessionState

instantiateSessionState finds the className that is then used to create and immediatelly build a BaseSessionStateBuilder.

instantiateSessionState reports a IllegalArgumentException while constructing a SessionState:

Error while instantiating '[className]'
Note
instantiateSessionState is used exclusively when SparkSession is requested for SessionState (and one is not available yet).

results matching ""

    No results matching ""