KafkaSourceProvider

KafkaSourceProvider is an interface to register Apache Kafka as a data source.

KafkaSourceProvider is a CreatableRelationProvider and RelationProvider.

KafkaSourceProvider is registered under kafka alias.

// start Spark application like spark-shell with the following package
// --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
scala> val fromKafkaTopic1 = spark.
  read.
  format("kafka").
  option("subscribe", "topic1").  // subscribe, subscribepattern, or assign
  option("kafka.bootstrap.servers", "localhost:9092").
  load("gauge_one")

KafkaSourceProvider uses a fixed schema (and makes sure that a user did not set a custom one).

import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"id".int)
scala> spark
  .read
  .format("kafka")
  .option("subscribe", "topic1")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .schema(schema) // <-- defining a custom schema is not supported
  .load
org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.;
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
  ... 48 elided

createRelation Method (from RelationProvider)

createRelation(
  sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation
Caution
FIXME
Note
createRelation is a part of RelationProvider Contract.

createRelation Method (from CreatableRelationProvider)

createRelation(
  sqlContext: SQLContext,
  mode: SaveMode,
  parameters: Map[String, String],
  df: DataFrame): BaseRelation
Caution
FIXME
Note
createRelation is a part of CreatableRelationProvider Contract.

createSource Method

createSource(
  sqlContext: SQLContext,
  metadataPath: String,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): Source
Caution
FIXME
Note
createSource is a part of Structured Streaming’s StreamSourceProvider Contract.

sourceSchema Method

sourceSchema(
  sqlContext: SQLContext,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): (String, StructType)
Caution
FIXME
val fromKafka = spark.read.format("kafka")...
scala> fromKafka.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
Note
sourceSchema is a part of Structured Streaming’s StreamSourceProvider Contract.

results matching ""

    No results matching ""