KafkaWriter — Writing Dataset to Kafka

KafkaWriter is used to write the result of a batch or structured streaming query to Apache Kafka (with a new execution id attached so you can see the execution in web UI’s SQL tab).

spark sql KafkaWriter write webui.png
Figure 1. KafkaWriter (write) in web UI

KafkaWriter makes sure that the schema of the Dataset to write records of contains:

  1. Required topic as a field of type StringType or specified explicitly

  2. Required value as a field of type StringType or BinaryType

  3. Optional key as a field of type StringType or BinaryType

// KafkaWriter is a private `kafka010` package object
// and so the code to use it should also be in the same package
// BEGIN: Use `:paste -raw` in spark-shell
package org.apache.spark.sql.kafka010

object PublicKafkaWriter {
  import org.apache.spark.sql.execution.QueryExecution
  def validateQuery(
      queryExecution: QueryExecution,
      kafkaParameters: Map[String, Object],
      topic: Option[String] = None): Unit = {
    import scala.collection.JavaConversions.mapAsJavaMap
    KafkaWriter.validateQuery(queryExecution, kafkaParameters, topic)
  }
}
// END

import org.apache.spark.sql.kafka010.{PublicKafkaWriter => PKW}

val spark: SparkSession = ...
val q = spark.range(1).select('id)
scala> PKW.validateQuery(
  queryExecution = q.queryExecution,
  kafkaParameters = Map.empty[String, Object])
org.apache.spark.sql.AnalysisException: topic option required when no 'topic' attribute is present. Use the topic option for setting a topic.;
  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$2.apply(KafkaWriter.scala:53)
  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$2.apply(KafkaWriter.scala:52)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:51)
  at org.apache.spark.sql.kafka010.PublicKafkaWriter$.validateQuery(<pastie>:10)
  ... 50 elided

Writing Query Results to Kafka — write Method

write(
  sparkSession: SparkSession,
  queryExecution: QueryExecution,
  kafkaParameters: ju.Map[String, Object],
  topic: Option[String] = None): Unit

write creates and executes a KafkaWriteTask per partition of the QueryExecution's RDD (with a new execution id attached so you can see the execution in web UI’s SQL tab).

Note

write is used when:

  • KafkaSourceProvider creates a BaseRelation (after writing the result of a structure query)

  • Structured Streaming’s KafkaSink commits a batch

Validating QueryExecution — validateQuery Method

validateQuery(
  queryExecution: QueryExecution,
  kafkaParameters: java.util.Map[String, Object],
  topic: Option[String] = None): Unit

validateQuery validates the schema of the input analyzed QueryExecution, i.e.

  1. Whether the required topic is available as a field of type StringType in the schema or as the input topic

  2. Whether the optional key is available as a field of type StringType or BinaryType in the schema

  3. Whether the required value is available as a field of type StringType or BinaryType in the schema

Note
validateQuery is used exclusively when KafkaWriter writes the result of a query to Kafka.

results matching ""

    No results matching ""