DataFrameReader — Reading Datasets from External Data Sources

DataFrameReader is an interface to read datasets from external data sources, e.g. files, Hive tables, JDBC or Dataset[String], into untyped DataFrames (mostly) or typed Datasets.

DataFrameReader is available using SparkSession.read.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read

DataFrameReader supports many file formats natively and offers the interface to define custom file formats.

Note
DataFrameReader assumes parquet file format by default that you can change using spark.sql.sources.default property.

After you have described the reading pipeline to read datasets from an external data source, you eventually trigger the loading using format-agnostic load or format-specific (e.g. json, csv) operators that create untyped DataFrames.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.DataFrame

// Using format-agnostic load operator
val csvs: DataFrame = spark
  .read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("*.csv")

// Using format-specific load operator
val jsons: DataFrame = spark
  .read
  .json("metrics/*.json")

(New in Spark 2.0) DataFrameReader can read text files using textFile methods that return typed Datasets.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
  .read
  .textFile("README.md")

(New in Spark 2.2) DataFrameReader can load datasets from Dataset[String] (with lines being complete "files") using format-specific csv and json operators.

val csvLine = "0,Warsaw,Poland"

import org.apache.spark.sql.Dataset
val cities: Dataset[String] = Seq(csvLine).toDS
scala> cities.show
+---------------+
|          value|
+---------------+
|0,Warsaw,Poland|
+---------------+

// Define schema explicitly (as below)
// or
// option("header", true) + option("inferSchema", true)
import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)
scala> schema.printTreeString
root
 |-- id: long (nullable = false)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

import org.apache.spark.sql.DataFrame
val citiesDF: DataFrame = spark
  .read
  .schema(schema)
  .csv(cities)
scala> citiesDF.show
+---+------+-------+
| id|  city|country|
+---+------+-------+
|  0|Warsaw| Poland|
+---+------+-------+

Defining Data Format — format method

format(source: String): DataFrameReader

You use format to configure DataFrameReader to use appropriate source format.

Supported data formats:

  • json

  • csv (since 2.0.0)

  • parquet (see Parquet)

  • orc

  • text

  • jdbc

  • libsvm — only when used in format("libsvm")

Defining Schema — schema method

schema(schema: StructType): DataFrameReader

You can specify a schema of the input data source.

import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)

scala> schema.printTreeString
root
 |-- id: long (nullable = false)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read.schema(schema)
Note
Some formats can infer schema from datasets, e.g. csv, using options.
Tip
Read up on Schema.

Defining Loading Options — option and options Methods

option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader  (1)
option(key: String, value: Long): DataFrameReader     (1)
option(key: String, value: Double): DataFrameReader   (1)
  1. Available as of Spark 2.0

You can also use options method to describe different options in a single Map.

options(options: scala.collection.Map[String, String]): DataFrameReader

Loading Data from Data Sources with Multiple Files Support — load Method

load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame

load loads a data from data sources that support multiple paths and represents it as an untyped DataFrame.

Internally, load creates a DataSource (for the current SparkSession, a user-specified schema, a source format and options). It then immediately resolves it and converts BaseRelation into a DataFrame.

Loading Datasets from Files (into DataFrames) Using Format-Specific Load Operators

DataFrameReader supports the following file formats:

json method

json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonRDD: RDD[String]): DataFrame

New in 2.0.0: prefersDecimal

csv method

csv(path: String): DataFrame
csv(paths: String*): DataFrame

parquet method

parquet(path: String): DataFrame
parquet(paths: String*): DataFrame

The supported options:

New in 2.0.0: snappy is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.

The compressions supported:

  • none or uncompressed

  • snappy - the default codec in Spark 2.0.0.

  • gzip - the default codec in Spark before 2.0.0

  • lzo

val tokens = Seq("hello", "henry", "and", "harry")
  .zipWithIndex
  .map(_.swap)
  .toDF("id", "token")

val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")

// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
  at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
  at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
  at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  ... 48 elided

orc method

orc(path: String): DataFrame
orc(paths: String*): DataFrame

Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.

Tip
Read ORC Files document to learn about the ORC file format.

text method

text method loads a text file.

text(path: String): DataFrame
text(paths: String*): DataFrame
Example
val lines: Dataset[String] = spark.read.text("README.md").as[String]

scala> lines.show
+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
|                    |
|<http://spark.apa...|
|                    |
|                    |
|## Online Documen...|
|                    |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
|                    |
|   ## Building Spark|
+--------------------+
only showing top 20 rows

Loading Datasets from Tables (into DataFrames) — table Method

table(tableName: String): DataFrame

table loads tableName table into an untyped DataFrame.

scala> spark.sql("SHOW TABLES").show(false)
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa     |false      |
+---------+-----------+

scala> spark.read.table("dafa").show(false)
+---+-------+
|id |text   |
+---+-------+
|1  |swiecie|
|0  |hello  |
+---+-------+
Caution
FIXME The method uses spark.sessionState.sqlParser.parseTableIdentifier(tableName) and spark.sessionState.catalog.lookupRelation. Would be nice to learn a bit more on their internals, huh?

Loading Data From External Table using JDBC — jdbc Method

jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame
jdbc(url: String,
  table: String,
  columnName: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  connectionProperties: Properties): DataFrame

jdbc loads data from an external table using JDBC and represents it as an untyped DataFrame.

Table 1. Options for JDBC Data Source (in alphabetical order)
Option Description

batchsize

The minimum value is 1

Defaults to 1000

createTableColumnTypes

createTableOptions

dbtable

(required)

driver

(recommended) JDBC driver’s class name.

When defined, the class will get registered with Java’s java.sql.DriverManager

fetchsize

Defaults to 0

isolationLevel

One of the following:

  • NONE

  • READ_UNCOMMITTED (default)

  • READ_COMMITTED

  • REPEATABLE_READ

  • SERIALIZABLE

lowerBound

Lower bound of partition column

numPartitions

Number of partitions

partitionColumn

Name of the column used to partition dataset (using a JDBCPartitioningInfo).

Used in JdbcRelationProvider to create a JDBCRelation (with proper JDBCPartitions with WHERE clause).

When defined, lowerBound, upperBound and numPartitions options are required.

When undefined, lowerBound and upperBound have to be undefined.

truncate

(used only for writing) Enables table truncation.

Defaults to false

upperBound

Upper bound of the partition column

url

(required)

Internally, jdbc creates a JDBCOptions from url, table and extraOptions with connectionProperties.

jdbc then creates one JDBCPartition per predicates.

In the end, jdbc requests the SparkSession to create a DataFrame for a JDBCRelation (given JDBCPartitions and JDBCOptions created earlier).

Note

jdbc does not support a custom schema and reports an AnalysisException if defined:

User specified schema not supported with `[jdbc]`
Note
jdbc method uses java.util.Properties (and appears overly Java-centric). Use format("jdbc") instead.

Loading Datasets From Text Files — textFile Method

textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]

textFile loads one or many text files into a typed Dataset[String].

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
  .read
  .textFile("README.md")
Note
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String].

Internally, textFile passes calls on to text method and selects the only value column before it applies Encoders.STRING encoder.

Creating DataFrameReader Instance

DataFrameReader takes the following when created:

results matching ""

    No results matching ""