emptyDataFrame: DataFrame
SQLContext
Caution
|
As of Spark 2.0.0 |
In the pre-Spark 2.0’s ear, SQLContext was the entry point for Spark SQL. Whatever you did in Spark SQL it had to start from creating an instance of SQLContext.
A SQLContext
object requires a SparkContext
, a CacheManager
, and a SQLListener. They are all transient
and do not participate in serializing a SQLContext.
You should use SQLContext
for the following:
Creating SQLContext Instance
You can create a SQLContext
using the following constructors:
-
SQLContext(sc: SparkContext)
-
SQLContext.getOrCreate(sc: SparkContext)
-
SQLContext.newSession()
allows for creating a new instance ofSQLContext
with a separate SQL configuration (through a sharedSparkContext
).
Setting Configuration Properties
You can set Spark SQL configuration properties using:
-
setConf(props: Properties): Unit
-
setConf(key: String, value: String): Unit
You can get the current value of a configuration property by key using:
-
getConf(key: String): String
-
getConf(key: String, defaultValue: String): String
-
getAllConfs: immutable.Map[String, String]
Note
|
Properties that start with spark.sql are reserved for Spark SQL. |
Creating DataFrames
emptyDataFrame
emptyDataFrame
creates an empty DataFrame
. It calls createDataFrame
with an empty RDD[Row]
and an empty schema StructType(Nil).
createDataFrame for RDD and Seq
createDataFrame[A <: Product](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product](data: Seq[A]): DataFrame
createDataFrame
family of methods can create a DataFrame
from an RDD
of Scala’s Product types like case classes or tuples or Seq
thereof.
createDataFrame for RDD of Row with Explicit Schema
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
This variant of createDataFrame
creates a DataFrame
from RDD
of Row and explicit schema.
Registering User-Defined Functions (UDF)
udf: UDFRegistration
udf
method gives you access to UDFRegistration
to manipulate user-defined functions. Functions registered using udf
are available for Hive queries only.
Tip
|
Read up on UDFs in UDFs — User-Defined Functions document. |
// Create a DataFrame
val df = Seq("hello", "world!").zip(0 to 1).toDF("text", "id")
// Register the DataFrame as a temporary table in Hive
df.registerTempTable("texts")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| texts| true|
+---------+-----------+
scala> sql("SELECT * FROM texts").show
+------+---+
| text| id|
+------+---+
| hello| 0|
|world!| 1|
+------+---+
// Just a Scala function
val my_upper: String => String = _.toUpperCase
// Register the function as UDF
spark.udf.register("my_upper", my_upper)
scala> sql("SELECT *, my_upper(text) AS MY_UPPER FROM texts").show
+------+---+--------+
| text| id|MY_UPPER|
+------+---+--------+
| hello| 0| HELLO|
|world!| 1| WORLD!|
+------+---+--------+
Caching DataFrames in In-Memory Cache
isCached(tableName: String): Boolean
isCached
method asks CacheManager
whether tableName
table is cached in memory or not. It simply requests CacheManager
for CachedData
and when exists, it assumes the table is cached.
cacheTable(tableName: String): Unit
You can cache a table in memory using cacheTable
.
Caution
|
Why would I want to cache a table? |
uncacheTable(tableName: String)
clearCache(): Unit
uncacheTable
and clearCache
remove one or all in-memory cached tables.
Implicits — SQLContext.implicits
The implicits
object is a helper class with methods to convert objects into Datasets and DataFrames, and also comes with many Encoders for "primitive" types as well as the collections thereof.
Note
|
Import the implicits by
|
It holds Encoders for Scala "primitive" types like Int
, Double
, String
, and their collections.
It offers support for creating Dataset
from RDD
of any types (for which an encoder exists in scope), or case classes or tuples, and Seq
.
It also offers conversions from Scala’s Symbol
or $
to Column
.
It also offers conversions from RDD
or Seq
of Product
types (e.g. case classes or tuples) to DataFrame
. It has direct conversions from RDD
of Int
, Long
and String
to DataFrame
with a single column name _1
.
Note
|
It is not possible to call toDF methods on RDD objects of other "primitive" types except Int , Long , and String .
|
Creating Datasets
createDataset[T: Encoder](data: Seq[T]): Dataset[T]
createDataset[T: Encoder](data: RDD[T]): Dataset[T]
createDataset
family of methods creates a Dataset from a collection of elements of type T
, be it a regular Scala Seq
or Spark’s RDD
.
It requires that there is an encoder in scope.
Note
|
Importing SQLContext.implicits brings many encoders available in scope. |
Accessing DataFrameReader (read method)
read: DataFrameReader
The experimental read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
.
Creating External Tables
createExternalTable(tableName: String, path: String): DataFrame
createExternalTable(tableName: String, path: String, source: String): DataFrame
createExternalTable(tableName: String, source: String, options: Map[String, String]): DataFrame
createExternalTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
The experimental createExternalTable
family of methods is used to create an external table tableName
and return a corresponding DataFrame
.
Caution
|
FIXME What is an external table? |
It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.
Dropping Temporary Tables
dropTempTable(tableName: String): Unit
dropTempTable
method drops a temporary table tableName
.
Caution
|
FIXME What is a temporary table? |
Creating Dataset[Long] (range method)
range(end: Long): Dataset[Long]
range(start: Long, end: Long): Dataset[Long]
range(start: Long, end: Long, step: Long): Dataset[Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
The range
family of methods creates a Dataset[Long]
with the sole id
column of LongType
for given start
, end
, and step
.
Note
|
The three first variants use SparkContext.defaultParallelism for the number of partitions numPartitions .
|
scala> spark.range(5)
res0: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> .show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
Creating DataFrames for Table
table(tableName: String): DataFrame
table
method creates a tableName
table and returns a corresponding DataFrame
.
Listing Existing Tables
tables(): DataFrame
tables(databaseName: String): DataFrame
table
methods return a DataFrame
that holds names of existing tables in a database.
scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| t| true|
| t2| true|
+---------+-----------+
The schema consists of two columns - tableName
of StringType
and isTemporary
of BooleanType
.
Note
|
tables is a result of SHOW TABLES [IN databaseName] .
|
tableNames(): Array[String]
tableNames(databaseName: String): Array[String]
tableNames
are similar to tables
with the only difference that they return Array[String]
which is a collection of table names.
Accessing StreamingQueryManager
streams: StreamingQueryManager
The streams
method returns a StreamingQueryManager that is used to…TK
Caution
|
FIXME |
Managing Active SQLContext for JVM
SQLContext.getOrCreate(sparkContext: SparkContext): SQLContext
SQLContext.getOrCreate
method returns an active SQLContext
object for the JVM or creates a new one using a given sparkContext
.
Note
|
It is a factory-like method that works on SQLContext class.
|
Interestingly, there are two helper methods to set and clear the active SQLContext
object - setActive
and clearActive
respectively.
setActive(spark: SQLContext): Unit
clearActive(): Unit
Executing SQL Queries
sql(sqlText: String): DataFrame
sql
executes the sqlText
SQL query.
Note
|
It supports Hive statements through HiveContext. |
scala> sql("set spark.sql.hive.version").show(false)
16/04/10 15:19:36 INFO HiveSqlParser: Parsing command: set spark.sql.hive.version
+----------------------+-----+
|key |value|
+----------------------+-----+
|spark.sql.hive.version|1.2.1|
+----------------------+-----+
scala> sql("describe database extended default").show(false)
16/04/10 15:21:14 INFO HiveSqlParser: Parsing command: describe database extended default
+-------------------------+--------------------------+
|database_description_item|database_description_value|
+-------------------------+--------------------------+
|Database Name |default |
|Description |Default Hive database |
|Location |file:/user/hive/warehouse |
|Properties | |
+-------------------------+--------------------------+
// Create temporary table
scala> spark.range(10).registerTempTable("t")
16/04/14 23:34:31 INFO HiveSqlParser: Parsing command: t
scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t")
16/04/14 23:34:38 INFO HiveSqlParser: Parsing command: CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t
scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| t| true|
| t2| true|
+---------+-----------+
sql
parses sqlText
using a dialect that can be set up using spark.sql.dialect setting.
Note
|
|
Tip
|
You may also use spark-sql shell script to interact with Hive. |
Internally, it uses SessionState.sqlParser.parsePlan(sql)
method to create a LogicalPlan.
Caution
|
FIXME Review |
scala> sql("show tables").show(false)
16/04/09 13:05:32 INFO HiveSqlParser: Parsing command: show tables
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa |false |
+---------+-----------+
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating New Session
newSession(): SQLContext
You can use newSession
method to create a new session without a cost of instantiating a new SqlContext from scratch.
newSession
returns a new SqlContext
that shares SparkContext
, CacheManager
, SQLListener, and ExternalCatalog.
Caution
|
FIXME Why would I need that? |