Apache Kafka Support

Requirements

The Apache Kafka 0.10 connector for Structured Streaming is pre-packaged for you in Spark images starting with 2.0.2-db1. Kafka 0.8 Connector (Experimental) is available since Spark 2.1.0-db2. You will not be able to run Kafka with Structured Streaming in any of the older versions.

Quickstart

Let’s start with a quick example: WordCount. The following notebook is all that it takes to run WordCount using Structured Streaming with Kafka.

Note

This notebook example uses Kafka 0.10. To use Kafka 0.8, simply change the format to kafka08 (i.e., .format("kafka08")).

Schema

The schema of the records are:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (cast("string"), udfs) to explicitly deserialize the keys and values.

Configuration

Please refer to Spark documentation for the comprehensive list of configurations. To get you started, here is a subset of configurations.

Note

As Structured Streaming is still under heavy development, this list may not be up to date.

There are multiple ways of providing which topics you would like to consume from. You should provide only one of these parameters. These are:

Option Value Supported Kafka Versions Description
subscribe A comma-separated list of topics 0.8, 0.10 The topic list to subscribe to
subscribePattern Java regex string 0.10 The pattern used to subscribe to topic(s)
assign json string {"topicA":[0,1],"topicB":[2,4]} 0.8, 0.10 Specific TopicPartitions to consume

Notable configurations:

Option Value Default Value Supported Kafka Versions Description
kafka.bootstrap.servers Comma-separated list of host:port empty 0.8, 0.10 [Required] The Kafka bootstrap.servers configuration. [1]
failOnDataLoss true or false true 0.10 [Optional] Whether to fail the query when it’s possible that data was lost [2].
minPartitions Integer >= 0, 0 = disabled 0 (disabled) 0.10 [Optional] Minimum number of partitions to read from Kafka [3].
kafka.group.id A Kafka consumer group id not set 0.10 [Optional] Group id to use while reading from Kafka. Supported in Spark 2.2+. Use this with caution [4].
[1]If you find there is no data from Kafka, please check the broker address list first. If the broken address list is wrong, there might not be any errors. This is because Kafka client assumes the brokers will get available eventually and just retry forever for network errors.
[2]Queries may permanently fail to read data from Kafka due to many scenarios like deleted topics, topic truncation before processing, etc. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.
[3]With Spark 2.1.0-db2 and later clusters, you can configure Spark to use an arbitrary minimum of partitions to read from Kafka using the minPartitions option. Normally Spark has a 1-1 mapping of Kafka TopicPartitions to Spark partitions consuming from Kafka. If you set the minPartitions option to a value greater than your Kafka TopicPartitions, then Spark will divvy up large Kafka partitions to smaller pieces. This option can be set at times of peak loads, data skew and as your stream is falling behind to increase processing rate. It comes at a cost of initializing Kafka consumers at each trigger, which may impact performance if you use SSL when connecting to Kafka. This feature is only available in Databricks.
[4]

By default, each query will generate a unique group id for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all the partitions of its subscribed topics. But, in some scenarios (e.g. Kafka group-based authorization), you may want to use specific authorized group ids to read data. Since Spark 2.2, you can optionally set the group id. However, use it with extreme caution as this may cause unexpected behavior.

  • Concurrently running queries (both, batch and streaming) with the same group id are likely interfere with each other causing each query to read only part of the data.
  • This may also occur when queries are started/restarted in quick succession. Set the Kafka consumer config session.timeout.ms to be very small to minimize such issues.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Note that you cannot set the following Kafka params for Kafka 0.10 connector as it will throw an exception:

  • group.id: Setting this is not allowed for Spark versions < 2.2.
  • auto.offset.reset: Set the source option startingOffsets to specify where to start instead. To maintain consistency, Structured Streaming (as opposed to the Kafka Consumer) manages the consumption of offsets internally. This ensures that you don’t miss any data after dynamically subscribing to new topics/partitions. Note that startingOffsets only applies when you start a new Streaming query, and that resuming from a checkpoint will always pick up from where the query left off.
  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
  • enable.auto.commit: Setting this is not allowed. Spark keeps track of Kafka offsets internally and doesn’t commit any offset.
  • interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.