Apache Kafka

The Apache Kafka connectors for Structured Streaming are packaged in Databricks Runtime. You use the kafka connector to connect to Kafka 0.10+ and the kafka08 connector to connect to Kafka 0.8+ (deprecated).

Schema

The schema of the records is:

Column

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (cast("string"), udfs) to explicitly deserialize the keys and values.

Quickstart

Let’s start with a the canonical WordCount example. The following notebook demonstrates how to run WordCount using Structured Streaming with Kafka.

Note

This notebook example uses Kafka 0.10. To use Kafka 0.8, change the format to kafka08 (that is, .format("kafka08")).

Kafka WordCount with Structured Streaming notebook

Open notebook in new tab

Configuration

For the comphensive list of configuration options, see the Spark Structured Streaming + Kafka Integration Guide. To get you started, here is a subset of the most common configuration options.

Note

As Structured Streaming is still under development, this list may not be up to date.

There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:

Option

Value

Supported Kafka Versions

Description

subscribe

A comma-separated list of topics.

0.8, 0.10

The topic list to subscribe to.

subscribePattern

Java regex string.

0.10

The pattern used to subscribe to topic(s).

assign

JSON string {"topicA":[0,1],"topic":[2,4]}.

0.8, 0.10

Specific topicPartitions to consume.

Other notable configurations:

Option

Value

Default Value

Supported Kafka Versions

Description

kafka.bootstrap.servers

Comma-separated list of host:port.

empty

0.8, 0.10

[Required] The Kafka bootstrap.servers configuration. If you find there is no data from Kafka, check the broker address list first. If the broker address list is incorrect, there might not be any errors. This is because Kafka client assumes the brokers will become available eventually and in the event of network errors retry forever.

failOnDataLoss

true or false.

true

0.10

[Optional] Whether to fail the query when it’s possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.

minPartitions

Integer >= 0, 0 = disabled.

0 (disabled)

0.10

[Optional] Minimum number of partitions to read from Kafka. With Spark 2.1.0-db2 and above, you can configure Spark to use an arbitrary minimum of partitions to read from Kafka using the minPartitions option. Normally Spark has a 1-1 mapping of Kafka topicPartitions to Spark partitions consuming from Kafka. If you set the minPartitions option to a value greater than your Kafka topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. This option can be set at times of peak loads, data skew, and as your stream is falling behind to increase processing rate. It comes at a cost of initializing Kafka consumers at each trigger, which may impact performance if you use SSL when connecting to Kafka.

kafka.group.id

A Kafka consumer group ID.

not set

0.10

[Optional] Group ID to use while reading from Kafka. Supported in Spark 2.2+. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.

  • Concurrently running queries (both, batch and streaming) with the same group ID are likely interfere with each other causing each query to read only part of the data.

  • This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer configuration session.timeout.ms to be very small.

startingOffsets

earliest , latest

latest

0.10

[Optional] The start point when a query is started, either “earliest” which is from the earliest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Important

You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:

  • group.id: Setting this parameter is not allowed for Spark versions below 2.2.

  • auto.offset.reset: Instead, set the source option startingOffsets to specify where to start. To maintain consistency, Structured Streaming (as opposed to the Kafka Consumer) manages the consumption of offsets internally. This ensures that you don’t miss any data after dynamically subscribing to new topics/partitions. startingOffsets applies only when you start a new Streaming query, and that resuming from a checkpoint always picks up from where the query left off.

  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.

  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

  • enable.auto.commit: Setting this parameter is not allowed. Spark keeps track of Kafka offsets internally and doesn’t commit any offset.

  • interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.

Production Structured Streaming with Kafka notebook

Open notebook in new tab

Metrics

Note

Available in Databricks Runtime 8.1 and above.

You can get the average, min, and max of the number of offsets that the streaming query is behind the latest available offset among all the subscribed topics with the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. See Reading Metrics Interactively.

Note

Available in Databricks Runtime 9.1 and above.

Get the estimated total number of bytes that the query process has not consumed from the subscribed topics by examining the value of estimatedTotalBytesBehindLatest. This estimate is based on the batches that were processed in the last 300 seconds. The timeframe that the estimate is based on can be changed by setting the option bytesEstimateWindowLength to a different value. For example, to set it to 10 minutes:

df = spark.readStream \
  .format("kafka") \
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Use SSL

To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, you specify the trust store location in the property kafka.ssl.truststore.location.

We recommend that you:

Once paths are mounted and secrets stored, you can do the following:

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", ...) \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.ssl.truststore.location", <dbfs-truststore-location>) \
  .option("kafka.ssl.keystore.location", <dbfs-keystore-location>) \
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>)) \
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))