import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read
DataFrameReader — Reading Datasets from External Data Sources
DataFrameReader
is an interface to read datasets from external data sources, e.g. files, Hive tables, JDBC or Dataset[String], into untyped DataFrames
(mostly) or typed Datasets
.
DataFrameReader
is available using SparkSession.read.
DataFrameReader
supports many file formats natively and offers the interface to define custom file formats.
Note
|
DataFrameReader assumes parquet file format by default that you can change using spark.sql.sources.default property.
|
After you have described the reading pipeline to read datasets from an external data source, you eventually trigger the loading using format-agnostic load or format-specific (e.g. json, csv) operators that create untyped DataFrames
.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.DataFrame
// Using format-agnostic load operator
val csvs: DataFrame = spark
.read
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load("*.csv")
// Using format-specific load operator
val jsons: DataFrame = spark
.read
.json("metrics/*.json")
(New in Spark 2.0) DataFrameReader
can read text files using textFile methods that return typed Datasets
.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
.read
.textFile("README.md")
(New in Spark 2.2) DataFrameReader
can load datasets from Dataset[String]
(with lines being complete "files") using format-specific csv and json operators.
val csvLine = "0,Warsaw,Poland"
import org.apache.spark.sql.Dataset
val cities: Dataset[String] = Seq(csvLine).toDS
scala> cities.show
+---------------+
| value|
+---------------+
|0,Warsaw,Poland|
+---------------+
// Define schema explicitly (as below)
// or
// option("header", true) + option("inferSchema", true)
import org.apache.spark.sql.types.StructType
val schema = new StructType()
.add($"id".long.copy(nullable = false))
.add($"city".string)
.add($"country".string)
scala> schema.printTreeString
root
|-- id: long (nullable = false)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
import org.apache.spark.sql.DataFrame
val citiesDF: DataFrame = spark
.read
.schema(schema)
.csv(cities)
scala> citiesDF.show
+---+------+-------+
| id| city|country|
+---+------+-------+
| 0|Warsaw| Poland|
+---+------+-------+
Defining Data Format — format
method
format(source: String): DataFrameReader
You use format
to configure DataFrameReader
to use appropriate source
format.
Supported data formats:
Note
|
You can define your own custom file formats. |
Defining Schema — schema
method
schema(schema: StructType): DataFrameReader
You can specify a schema
of the input data source.
import org.apache.spark.sql.types.StructType
val schema = new StructType()
.add($"id".long.copy(nullable = false))
.add($"city".string)
.add($"country".string)
scala> schema.printTreeString
root
|-- id: long (nullable = false)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read.schema(schema)
Tip
|
Read up on Schema. |
Defining Loading Options — option
and options
Methods
option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader (1)
option(key: String, value: Long): DataFrameReader (1)
option(key: String, value: Double): DataFrameReader (1)
-
Available as of Spark 2.0
You can also use options
method to describe different options in a single Map
.
options(options: scala.collection.Map[String, String]): DataFrameReader
Loading Data from Data Sources with Multiple Files Support — load
Method
load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame
load
loads a data from data sources that support multiple paths
and represents it as an untyped DataFrame.
Internally, load
creates a DataSource
(for the current SparkSession, a user-specified schema, a source format and options). It then immediately resolves it and converts BaseRelation
into a DataFrame
.
Loading Datasets from Files (into DataFrames) Using Format-Specific Load Operators
DataFrameReader
supports the following file formats:
json
method
json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonRDD: RDD[String]): DataFrame
New in 2.0.0: prefersDecimal
parquet
method
parquet(path: String): DataFrame
parquet(paths: String*): DataFrame
The supported options:
-
compression (default:
snappy
)
New in 2.0.0: snappy
is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.
-
none
oruncompressed
-
snappy
- the default codec in Spark 2.0.0. -
gzip
- the default codec in Spark before 2.0.0 -
lzo
val tokens = Seq("hello", "henry", "and", "harry")
.zipWithIndex
.map(_.swap)
.toDF("id", "token")
val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")
// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
... 48 elided
orc
method
orc(path: String): DataFrame
orc(paths: String*): DataFrame
Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.
Tip
|
Read ORC Files document to learn about the ORC file format. |
text
method
text
method loads a text file.
text(path: String): DataFrame
text(paths: String*): DataFrame
Example
val lines: Dataset[String] = spark.read.text("README.md").as[String]
scala> lines.show
+--------------------+
| value|
+--------------------+
| # Apache Spark|
| |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
| |
|<http://spark.apa...|
| |
| |
|## Online Documen...|
| |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
| |
| ## Building Spark|
+--------------------+
only showing top 20 rows
Loading Datasets from Tables (into DataFrames) — table
Method
table(tableName: String): DataFrame
table
loads tableName
table into an untyped DataFrame.
scala> spark.sql("SHOW TABLES").show(false)
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa |false |
+---------+-----------+
scala> spark.read.table("dafa").show(false)
+---+-------+
|id |text |
+---+-------+
|1 |swiecie|
|0 |hello |
+---+-------+
Caution
|
FIXME The method uses spark.sessionState.sqlParser.parseTableIdentifier(tableName) and spark.sessionState.catalog.lookupRelation . Would be nice to learn a bit more on their internals, huh?
|
Loading Data From External Table using JDBC — jdbc
Method
jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
jdbc(url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
jdbc
loads data from an external table using JDBC and represents it as an untyped DataFrame.
Option | Description |
---|---|
|
The minimum value is Defaults to |
|
|
|
|
|
(required) |
|
(recommended) JDBC driver’s class name. When defined, the class will get registered with Java’s java.sql.DriverManager |
|
Defaults to |
|
One of the following:
|
Lower bound of partition column |
|
Number of partitions |
|
Name of the column used to partition dataset (using a Used in When defined, lowerBound, upperBound and numPartitions options are required. When undefined, lowerBound and upperBound have to be undefined. |
|
|
(used only for writing) Enables table truncation. Defaults to |
Upper bound of the partition column |
|
|
(required) |
Internally, jdbc
creates a JDBCOptions from url
, table
and extraOptions
with connectionProperties
.
jdbc
then creates one JDBCPartition
per predicates
.
In the end, jdbc
requests the SparkSession to create a DataFrame
for a JDBCRelation (given JDBCPartitions
and JDBCOptions
created earlier).
Note
|
|
Note
|
jdbc method uses java.util.Properties (and appears overly Java-centric). Use format("jdbc") instead.
|
Tip
|
Review the exercise Creating DataFrames from Tables using JDBC and PostgreSQL. |
Loading Datasets From Text Files — textFile
Method
textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]
textFile
loads one or many text files into a typed Dataset[String].
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
.read
.textFile("README.md")
Note
|
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String] .
|