import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // avoid hardcoding the deployment environment
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.getOrCreate
SparkSession — The Entry Point to Spark SQL
SparkSession
is the entry point to Spark SQL. It is the very first object you have to create while developing Spark SQL applications using the fully-typed Dataset (or untyped Row
-based DataFrame) data abstractions.
Note
|
SparkSession has merged SQLContext and HiveContext in one object in Spark 2.0.
|
You use the SparkSession.builder method to create an instance of SparkSession
.
And stop the current SparkSession
using stop method.
spark.stop
You can have as many SparkSessions
as you want in a single Spark application. The common use case is to keep relational entities separate per SparkSession
(see catalog attribute).
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, SparkSession
requires a SparkContext and an optional SharedState (that represents the shared state across SparkSession
instances).
Method | Description |
---|---|
"Opens" a builder to get or create a |
|
Returns the current version of Spark. |
|
Use |
|
Creates an empty |
|
Creates a |
|
Executes a SQL query (and returns a |
|
Access to user-defined functions (UDFs). |
|
Creates a |
|
Access to the catalog of the entities of structured queries |
|
Access to |
|
Access to the current runtime configuration. |
|
Access to |
|
Access to |
|
Creates a new |
|
Stops the |
Tip
|
Use spark.sql.warehouse.dir Spark property to change the location of Hive’s Refer to SharedState to learn about (the low-level details of) Spark SQL support for Apache Hive. See also the official Hive Metastore Administration document. |
Name | Type | Description |
---|---|---|
Internally,
|
||
Note
|
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.
|
Creating SparkSession Instance
Caution
|
FIXME |
Creating SparkSession Using Builder Pattern — builder
Method
builder(): Builder
builder
creates a new Builder that you use to build a fully-configured SparkSession
using a fluent API.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip
|
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia. |
Accessing Version of Spark — version
Method
version: String
version
returns the version of Apache Spark in use.
Internally, version
uses spark.SPARK_VERSION
value that is the version
property in spark-version-info.properties
properties file on CLASSPATH.
Implicit Conversions — implicits
object
The implicits
object is a helper class with the Scala implicit methods (aka conversions) to convert Scala objects to Datasets, DataFrames and Columns. It also defines Encoders for Scala’s "primitive" types, e.g. Int
, Double
, String
, and their products and collections.
Note
|
Import the implicits by
|
implicits
object offers support for creating Dataset
from RDD
of any type (for which an encoder exists in scope), or case classes or tuples, and Seq
.
implicits
object also offers conversions from Scala’s Symbol
or $
to Column
.
It also offers conversions from RDD
or Seq
of Product
types (e.g. case classes or tuples) to DataFrame
. It has direct conversions from RDD
of Int
, Long
and String
to DataFrame
with a single column name _1
.
Note
|
It is only possible to call toDF methods on RDD objects of Int , Long , and String "primitive" types.
|
Creating Empty Dataset — emptyDataset
method
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset
creates an empty Dataset (assuming that future records being of type T
).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset
creates a LocalRelation
logical query plan.
Creating Dataset from Local Collections and RDDs — createDataset
methods
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset
is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T]
, Java’s List[T]
, or a distributed RDD[T]
.
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset
creates a LocalRelation
logical query plan (for the input data
collection) or LogicalRDD
(for the input RDD[T]
).
Tip
|
You’d be better off using Scala implicits and
|
Internally, createDataset
first looks up the implicit expression encoder in scope to access the AttributeReference
s (of the schema).
Note
|
Only unresolved expression encoders are currently supported. |
The expression encoder is then used to map elements (of the input Seq[T]
) into a collection of InternalRows. With the references and rows, createDataset
returns a Dataset with a LocalRelation
logical query plan.
Creating Dataset With Single Long Column — range
methods
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range
family of methods create a Dataset of Long
numbers.
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
Note
|
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions .
|
Internally, range
creates a new Dataset[Long]
with Range
logical plan and Encoders.LONG
encoder.
Creating Empty DataFrame — emptyDataFrame
method
emptyDataFrame: DataFrame
emptyDataFrame
creates an empty DataFrame
(with no rows and columns).
It calls createDataFrame with an empty RDD[Row]
and an empty schema StructType(Nil).
Creating DataFrames from RDDs with Explicit Schema — createDataFrame
method
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame
creates a DataFrame
using RDD[Row]
and the input schema
. It is assumed that the rows in rowRDD
all match the schema
.
Executing SQL Queries (aka SQL Mode) — sql
Method
sql(sqlText: String): DataFrame
sql
executes the sqlText
SQL statement and creates a DataFrame.
Note
|
|
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally, sql
requests the current ParserInterface
to execute a SQL query that gives a LogicalPlan.
Note
|
sql uses SessionState to access the current ParserInterface .
|
sql
then creates a DataFrame using the current SparkSession
(itself) and the LogicalPlan.
Tip
|
spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).
|
Accessing UDF Registration Interface — udf
Attribute
udf: UDFRegistration
udf
attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is simply an alias for SessionState.udfRegistration.
Creating DataFrame for Table — table
method
table(tableName: String): DataFrame
table
creates a DataFrame from records in the tableName
table (if exists).
val df = spark.table("mytable")
Accessing Metastore — catalog
Attribute
catalog: Catalog
catalog
attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and temporary views).
Tip
|
All methods in Catalog return Datasets .
|
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, catalog
creates a CatalogImpl (that uses the current SparkSession
).
Accessing DataFrameReader — read
method
read: DataFrameReader
read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
.
val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read
Runtime Configuration — conf
attribute
conf: RuntimeConfig
conf
returns the current runtime configuration (as RuntimeConfig
) that wraps SQLConf.
Caution
|
FIXME |
streams
Attribute
streams: StreamingQueryManager
streams
attribute gives access to StreamingQueryManager (through SessionState).
val spark: SparkSession = ...
spark.streams.active.foreach(println)
experimentalMethods
Attribute
experimental: ExperimentalMethods
experimentalMethods
is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]
s.
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
newSession
method
newSession(): SparkSession
newSession
creates (starts) a new SparkSession
(with the current SparkContext and SharedState).
scala> println(sc.version)
2.0.0-SNAPSHOT
scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a
Stopping SparkSession — stop
Method
stop(): Unit
stop
stops the SparkSession
, i.e. stops the underlying SparkContext
.
Create DataFrame from BaseRelation — baseRelationToDataFrame
Method
baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame
Internally, baseRelationToDataFrame
creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
Note
|
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
|
Note
|
|
Building SessionState — instantiateSessionState
Internal Method
instantiateSessionState(className: String, sparkSession: SparkSession): SessionState
instantiateSessionState
finds the className
that is then used to create and immediatelly build a BaseSessionStateBuilder
.
instantiateSessionState
reports a IllegalArgumentException
while constructing a SessionState
:
Error while instantiating '[className]'
Note
|
instantiateSessionState is used exclusively when SparkSession is requested for SessionState (and one is not available yet).
|