Stream processing with Apache Kafka and Databricks

This article describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Databricks.

For more Kafka, see the Kafka documentation.

Read data from Kafka

The following is an example for a streaming read from Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Databricks also supports batch read semantics for Kafka data sources, as shown in the following example:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

For incremental batch loading, Databricks recommends using Kafka with Trigger.AvailableNow. See Configuring incremental batch processing.

Configure Kafka Structured Streaming reader

Databricks provides the kafka keyword as a data format to configure connections to Kafka 0.10+.

The following are the most common configurations for Kafka:

There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:

Option

Value

Description

subscribe

A comma-separated list of topics.

The topic list to subscribe to.

subscribePattern

Java regex string.

The pattern used to subscribe to topic(s).

assign

JSON string {"topicA":[0,1],"topic":[2,4]}.

Specific topicPartitions to consume.

Other notable configurations:

Option

Value

Default Value

Description

kafka.bootstrap.servers

Comma-separated list of host:port.

empty

[Required] The Kafka bootstrap.servers configuration. If you find there is no data from Kafka, check the broker address list first. If the broker address list is incorrect, there might not be any errors. This is because Kafka client assumes the brokers will become available eventually and in the event of network errors retry forever.

failOnDataLoss

true or false.

true

[Optional] Whether to fail the query when it’s possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.

minPartitions

Integer >= 0, 0 = disabled.

0 (disabled)

[Optional] Minimum number of partitions to read from Kafka. You can configure Spark to use an arbitrary minimum of partitions to read from Kafka using the minPartitions option. Normally Spark has a 1-1 mapping of Kafka topicPartitions to Spark partitions consuming from Kafka. If you set the minPartitions option to a value greater than your Kafka topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. This option can be set at times of peak loads, data skew, and as your stream is falling behind to increase processing rate. It comes at a cost of initializing Kafka consumers at each trigger, which may impact performance if you use SSL when connecting to Kafka.

kafka.group.id

A Kafka consumer group ID.

not set

[Optional] Group ID to use while reading from Kafka. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.

  • Concurrently running queries (both, batch and streaming) with the same group ID are likely interfere with each other causing each query to read only part of the data.

  • This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer configuration session.timeout.ms to be very small.

startingOffsets

earliest , latest

latest

[Optional] The start point when a query is started, either “earliest” which is from the earliest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Schema for Kafka records

The schema of Kafka records is:

Column

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string")) to explicitly deserialize the keys and values.

Write data to Kafka

The following is an example for a streaming write to Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Databricks also supports batch write semantics to Kafka data sinks, as shown in the following example:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configure Kafka Structured Streaming writer

Important

In Databricks Runtime 13.1 and above, a newer version of the kafka-clients library is used that enables idempotent writes by default. If a Kafka sink uses version 2.8.0 or below with ACLs configured but without IDEMPOTENT_WRITE enabled, the write fails with the error message org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

You can resolve this error by upgrading to Kafka version 2.8.0 or above or by setting .option(“kafka.enable.idempotence”, “false”) while configuring your Structured Streaming writer.

The schema provided to the DataStreamWriter interacts with the Kafka sink. You can use the following fields:

Column name

Required or optional

Type

key

optional

STRING or BINARY

value

required

STRING or BINARY

headers

optional

ARRAY

topic

optional (ignored if topic is set as writer option)

STRING

partition

optional

INT

The following are common options set while writing to Kafka:

Option

Value

Default value

Description

kafka.boostrap.servers

A comma-separated list of <host:port>

none

[Required] The Kafka bootstrap.servers configuration.

topic

STRING

not set

[Optional] Sets the topic for all rows to be written. This option overrides any topic column that exists in the data.

includeHeaders

BOOLEAN

false

[Optional] Whether to include the Kafka headers in the row.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Retrieve Kafka metrics

Note

Available in Databricks Runtime 8.1 and above.

You can get the average, min, and max of the number of offsets that the streaming query is behind the latest available offset among all the subscribed topics with the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. See Reading Metrics Interactively.

Note

Available in Databricks Runtime 9.1 and above.

Get the estimated total number of bytes that the query process has not consumed from the subscribed topics by examining the value of estimatedTotalBytesBehindLatest. This estimate is based on the batches that were processed in the last 300 seconds. The timeframe that the estimate is based on can be changed by setting the option bytesEstimateWindowLength to a different value. For example, to set it to 10 minutes:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Use SSL to connect Databricks to Kafka

To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, you specify the trust store location in the property kafka.ssl.truststore.location.

Databricks recommends that you:

  • Store your certificates in cloud object storage. You can restrict access to the certificates only to clusters that can access Kafka. See Data governance guide.

  • Store your certificate passwords as secrets in a secret scope.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)