Skip to main content

Stream processing with Apache Kafka and Databricks

This article describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Databricks.

For more Kafka, see the Kafka documentation.

Read data from Kafka

The following is an example for a streaming read from Kafka:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

Databricks also supports batch read semantics for Kafka data sources, as shown in the following example:

Python
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

For incremental batch loading, Databricks recommends using Kafka with Trigger.AvailableNow. See Configuring incremental batch processing.

In Databricks Runtime 13.3 LTS and above, Databricks provides a SQL function for reading Kafka data. Streaming with SQL is supported only in Lakeflow Spark Declarative Pipelines or with streaming tables in Databricks SQL. See read_kafka table-valued function.

Configure Kafka Structured Streaming reader

Databricks provides the kafka keyword as a data format to configure connections to Kafka 0.10+.

The following are the most common configurations for Kafka:

There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:

Option

Value

Description

subscribe

A comma-separated list of topics.

The topic list to subscribe to.

subscribePattern

Java regex string.

The pattern used to subscribe to topic(s).

assign

JSON string {"topicA":[0,1],"topic":[2,4]}.

Specific topicPartitions to consume.

Other notable configurations:

Option

Value

Default Value

Description

kafka.bootstrap.servers

Comma-separated list of host:port.

empty

[Required] The Kafka bootstrap.servers configuration. If you find there is no data from Kafka, check the broker address list first. If the broker address list is incorrect, there might not be any errors. This is because Kafka client assumes the brokers will become available eventually and in the event of network errors retry forever.

failOnDataLoss

true or false.

true

[Optional] Whether to fail the query when it's possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.

minPartitions

Integer >= 0, 0 = disabled.

0 (disabled)

[Optional] Minimum number of partitions to read from Kafka. You can configure Spark to use an arbitrary minimum of partitions to read from Kafka using the minPartitions option. Normally Spark has a 1-1 mapping of Kafka topicPartitions to Spark partitions consuming from Kafka. If you set the minPartitions option to a value greater than your Kafka topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. This option can be set at times of peak loads, data skew, and as your stream is falling behind to increase processing rate. It comes at a cost of initializing Kafka consumers at each trigger, which may impact performance if you use SSL when connecting to Kafka.

kafka.group.id

A Kafka consumer group ID.

not set

[Optional] Group ID to use while reading from Kafka. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.

  • Concurrently running queries (both, batch and streaming) with the same group ID are likely interfere with each other causing each query to read only part of the data.
  • This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer configuration session.timeout.ms to be very small.

startingOffsets

earliest , latest

latest

[Optional] The start point when a query is started, either “earliest” which is from the earliest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Schema for Kafka records

The schema of Kafka records is:

Column

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string")) to explicitly deserialize the keys and values.

Write data to Kafka

The following is an example for a streaming write to Kafka:

Python
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks also supports batch write semantics to Kafka data sinks, as shown in the following example:

Python
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Configure the Kafka Structured Streaming writer

important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If a Kafka sink uses version 2.8.0 or below with ACLs configured, but without IDEMPOTENT_WRITE enabled, the write fails with the error message org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolve this error by upgrading to Kafka version 2.8.0 or above, or by setting .option(“kafka.enable.idempotence”, “false”) while configuring your Structured Streaming writer.

The schema provided to the DataStreamWriter interacts with the Kafka sink. You can use the following fields:

Column name

Required or optional

Type

key

optional

STRING or BINARY

value

required

STRING or BINARY

headers

optional

ARRAY

topic

optional (ignored if topic is set as writer option)

STRING

partition

optional

INT

The following are common options set while writing to Kafka:

Option

Value

Default value

Description

kafka.boostrap.servers

A comma-separated list of <host:port>

none

[Required] The Kafka bootstrap.servers configuration.

topic

STRING

not set

[Optional] Sets the topic for all rows to be written. This option overrides any topic column that exists in the data.

includeHeaders

BOOLEAN

false

[Optional] Whether to include the Kafka headers in the row.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Retrieve Kafka metrics

You can get the average, min, and max of the number of offsets that the streaming query is behind the latest available offset among all the subscribed topics with the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. See Reading Metrics Interactively.

note

Available in Databricks Runtime 9.1 and above.

Get the estimated total number of bytes that the query process has not consumed from the subscribed topics by examining the value of estimatedTotalBytesBehindLatest. This estimate is based on the batches that were processed in the last 300 seconds. The timeframe that the estimate is based on can be changed by setting the option bytesEstimateWindowLength to a different value. For example, to set it to 10 minutes:

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

Use SSL to connect Databricks to Kafka

To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, you specify the trust store location in the property kafka.ssl.truststore.location.

Databricks recommends that you:

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Kafka Authentication

Unity Catalog service credential authentication

Since the release of Databricks Runtime 16.1, Databricks supports Unity Catalog service credentials for authenticating access to AWS Managed Streaming for Apache Kafka (MSK) and Azure Event Hubs. Support for GCP Managed Service for Apache Kafka was added in Databricks Runtime 18.0. Databricks recommends this approach for running Kafka streaming on shared clusters and, where supported, when using serverless compute.

To use an Unity Catalog service credential for authentication, perform the following steps:

  • Create a new Unity Catalog service credential. If you are not familiar with this process, see Create service credentials for instructions on creating one.
  • Provide the name of your Unity Catalog service credential as a source option in your Kafka configuration. Set the option databricks.serviceCredential to the name of your service credential.

Databricks supports Unity Catalog service credentials for authenticating access to Google Cloud Managed Service for Apache Kafka in Databricks Runtime 18.0 and above. Google Cloud Managed Service for Apache Kafka clusters runs in a VPC, so your Databricks compute must run in a VPC that can reach the managed Kafka cluster. Serverless compute isn't supported because it runs in the Databricks-managed control plane. To enable connectivity, configure VPC peering or Private Service Connect. See VPC Network Peering or Private Service Connect in the Google Cloud documentation.

For authentication, grant the Google Cloud service account created for your Unity Catalog service credential the following role in the Google Cloud project that contains your managed Kafka cluster:

  • roles/managedkafka.client

For instructions, see Granting, changing, and revoking access in the Google Cloud documentation.

The following example configures Kafka as a source using a service credential:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

When you provide an Unity Catalog service credential to Kafka, do not specify the following options:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

The options databricks.serviceCredential and databricks.serviceCredential.scope are Spark source options. The options kafka.databricks.serviceCredential and kafka.databricks.serviceCredential.scope are Kafka client options consumed by the login callback handler, so they must be prefixed with kafka..

You can authenticate to Google Cloud Managed Service for Apache Kafka without using the databricks.serviceCredential source option by specifying the SASL/OAUTHBEARER options directly. Use this approach only if you need to set Kafka SASL options explicitly (for example, to use a specific callback handler).

This approach requires a Unity Catalog service credential:

  • The service credential's Google Cloud service account must have roles/managedkafka.client in the Google Cloud project that contains your managed Kafka cluster.
  • Use the Databricks-provided login callback handler org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler (not com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, which isn't included in Databricks Runtime).

Authenticating with Google service accounts outside of Unity Catalog isn't supported.

The following example configures Kafka as a source by specifying SASL options directly:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

Don't set databricks.serviceCredential when you specify these SASL options. If databricks.serviceCredential is set, Databricks configures Kafka authentication automatically and disallows specifying kafka.sasl.* options.

Handling potential errors

  • Exception missing gcp_options when authenticating

    This exception is thrown if the callback handler can't infer the scope from the bootstrap URL. Set databricks.serviceCredential.scope manually. For most use cases, set it to https://www.googleapis.com/auth/cloud-platform.

  • No resolvable bootstrap URLs found

    This means the compute cluster is unable to resolve the bootstrap hostname. Check your VPC configuration to ensure the compute cluster is able to reach the managed Kafka cluster.

  • Permission issues

    • Ensure that the service account has the roles/managedkafka.client IAM role binding on the project that the Kafka cluster belongs to.
    • Ensure that the Kafka cluster has the right ACLs defined for the topic and service accounts. See Access control with Kafka ACLs in the Google Cloud documentation.