// start Spark application like spark-shell with the following package
// --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
scala> val fromKafkaTopic1 = spark.
read.
format("kafka").
option("subscribe", "topic1"). // subscribe, subscribepattern, or assign
option("kafka.bootstrap.servers", "localhost:9092").
load("gauge_one")
KafkaSourceProvider
KafkaSourceProvider
is an interface to register Apache Kafka as a data source.
KafkaSourceProvider
is a CreatableRelationProvider and RelationProvider.
KafkaSourceProvider
is registered under kafka
alias.
KafkaSourceProvider
uses a fixed schema (and makes sure that a user did not set a custom one).
import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"id".int)
scala> spark
.read
.format("kafka")
.option("subscribe", "topic1")
.option("kafka.bootstrap.servers", "localhost:9092")
.schema(schema) // <-- defining a custom schema is not supported
.load
org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.;
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
... 48 elided
createRelation
Method (from RelationProvider)
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
Caution
|
FIXME |
Note
|
createRelation is a part of RelationProvider Contract.
|
createRelation
Method (from CreatableRelationProvider)
createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation
Caution
|
FIXME |
Note
|
createRelation is a part of CreatableRelationProvider Contract.
|
createSource
Method
createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
Caution
|
FIXME |
Note
|
createSource is a part of Structured Streaming’s StreamSourceProvider Contract.
|
sourceSchema
Method
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
Caution
|
FIXME |
val fromKafka = spark.read.format("kafka")...
scala> fromKafka.printSchema
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Note
|
sourceSchema is a part of Structured Streaming’s StreamSourceProvider Contract.
|