// Define the schema using a case class
case class Person(name: String, age: Int)
// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek", 10)))
// Convert RDD[Person] to Dataset[Person] and run a query
// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10).where('age <= 19).select('name).as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]
scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+
// You could however want to use good ol' SQL, couldn't you?
// 1. Register people Dataset as a temporary view in Catalog
people.createOrReplaceTempView("people")
// 2. Run SQL query
val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19")
scala> teenagers.show
+-----+---+
| name|age|
+-----+---+
|Jacek| 10|
+-----+---+
Spark SQL — Batch and Streaming Queries Over Structured Data on Massive Scale
Like Apache Spark in general, Spark SQL in particular is all about distributed in-memory computations on massive scale.
The primary difference between Spark SQL’s and the "bare" Spark Core’s RDD computation models is the framework for loading, querying and persisting structured and semi-structured data using structured queries that can be expressed using good ol' SQL, HiveQL and the custom high-level SQL-like, declarative, type-safe Dataset API called Structured Query DSL.
Note
|
Semi- and structured datasets are collections of records that can be described using schema implicitly or explicitly, respectively. |
Spark SQL supports structured queries in batch and streaming modes (with the latter as a separate module of Spark SQL called Structured Streaming).
Note
|
Under the covers, structured queries are automatically compiled into corresponding RDD operations. |
Regardless of the query language you choose queries all end up as a tree of Catalyst expressions with further optimizations along the way to your large distributed data sets.
As of Spark 2.0, Spark SQL is now de facto the primary and feature-rich interface to Spark’s underlying in-memory distributed platform (hiding Spark Core’s RDDs behind higher-level abstractions).
When the Hive support is enabled, Spark developers can read and write data located in existing Apache Hive deployments using HiveQL.
sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('path'='people.csv', 'header'='true')")
// Queries are expressed in HiveQL
sql("FROM v1").show
scala> sql("desc EXTENDED v1").show(false)
+----------+---------+-------+
|col_name |data_type|comment|
+----------+---------+-------+
|# col_name|data_type|comment|
|key |int |null |
|value |string |null |
+----------+---------+-------+
Like SQL and NoSQL databases, Spark SQL offers performance query optimizations using Logical Query Plan Optimizer, code generation (that could often be better than your own custom hand-written code!) and Tungsten execution engine with its own Internal Binary Row Format.
Spark SQL introduces a tabular data abstraction called Dataset (that was previously DataFrame). Dataset
data abstraction is designed to make processing large amount of structured tabular data on Spark infrastructure simpler and faster.
Note
|
Quoting Apache Drill which applies to Spark SQL perfectly:
|
The following snippet shows a batch ETL pipeline to process JSON files and saving their subset as CSVs.
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
With Structured Streaming feature however, the above static batch query becomes dynamic and continuous paving the way for continuous applications.
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
spark.readStream
.format("json")
.schema(schema)
.load("input-json")
.select("name", "score")
.where('score > 15)
.writeStream
.format("console")
.start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+
As of Spark 2.0, the main data abstraction of Spark SQL is Dataset. It represents a structured data which are records with a known schema. This structured data representation Dataset
enables compact binary representation using compressed columnar format that is stored in managed objects outside JVM’s heap. It is supposed to speed computations up by reducing memory usage and GCs.
Spark SQL supports predicate pushdown to optimize performance of Dataset queries and can also generate optimized code at runtime.
Spark SQL comes with the different APIs to work with:
-
Dataset API (formerly DataFrame API) with a strongly-typed LINQ-like Query DSL that Scala programmers will likely find very appealing to use.
-
Structured Streaming API (aka Streaming Datasets) for continuous incremental execution of structured queries.
-
Non-programmers will likely use SQL as their query language through direct integration with Hive
-
JDBC/ODBC fans can use JDBC interface (through Thrift JDBC/ODBC Server) and connect their tools to Spark’s distributed query engine.
Spark SQL comes with a uniform interface for data access in distributed storage systems like Cassandra or HDFS (Hive, Parquet, JSON) using specialized DataFrameReader and DataFrameWriter objects.
Spark SQL allows you to execute SQL-like queries on large volume of data that can live in Hadoop HDFS or Hadoop-compatible file systems like S3. It can access data from different data sources - files or tables.
Spark SQL defines the following types of functions:
-
standard functions or User-Defined Functions (UDFs) that take values from a single row as input to generate a single return value for every input row.
-
basic aggregate functions that operate on a group of rows and calculate a single return value per group.
-
window aggregate functions that operate on a group of rows and calculate a single return value for each row in a group.
There are two supported catalog implementations — in-memory
(default) and hive
— that you can set using spark.sql.catalogImplementation property.
From user@spark:
If you already loaded csv data into a dataframe, why not register it as a table, and use Spark SQL to find max/min or any other aggregates? SELECT MAX(column_name) FROM dftable_name … seems natural.
you’re more comfortable with SQL, it might worth registering this DataFrame as a table and generating SQL query to it (generate a string with a series of min-max calls)
You can parse data from external data sources and let the schema inferencer to deduct the schema.
// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
.agg(max('j).as("aggOrdering"))
.orderBy(sum('j))
.as[(Int, Int)]
query.collect contains (1, 2) // true
// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+