Skip to main content

Connect to Apache Kafka

This article describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Databricks.

For more information about Kafka, see the Apache Kafka documentation.

Read data from Kafka

Databricks provides the kafka keyword as a data format to configure connections to Kafka. The following is an example for a streaming read:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

Databricks also supports batch read semantics, as shown in the following example:

Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

For incremental batch loading, Databricks recommends using Kafka with Trigger.AvailableNow. See AvailableNow: Incremental batch processing.

In Databricks Runtime 13.3 LTS and above, Databricks also provides a SQL function for reading Kafka data. Streaming with SQL is supported only in Lakeflow Spark Declarative Pipelines or with streaming tables in Databricks SQL. See read_kafka table-valued function.

Configure Kafka Structured Streaming reader

The following option must be set for the Kafka source for both batch and streaming queries:

Option

Value

Description

kafka.bootstrap.servers

A comma-separated list of host:port

The Kafka cluster bootstrap servers

Additionally, one of the following options is required to specify which topics to subscribe to:

Option

Value

Description

subscribe

A comma-separated list of topics.

The topic list to subscribe to.

subscribePattern

Java regex string.

The pattern used to subscribe to topic(s).

assign

JSON string {"topicA":[0,1],"topic":[2,4]}.

Specific topicPartitions to consume.

See the Options page for the full list of available options.

Schema for Kafka records

The records returned by the Kafka Structured Streaming reader will have the following schema:

Column

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string") or from_avro) to explicitly deserialize the keys and values.

Write data to Kafka

The following is an example for a streaming write to Kafka:

Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks also supports batch write semantics to Kafka data sinks, as shown in the following example:

Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Configure the Kafka Structured Streaming writer

important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If a Kafka sink uses version 2.8.0 or below with ACLs configured, but without IDEMPOTENT_WRITE enabled, the write fails with the error message org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolve this error by upgrading to Kafka version 2.8.0 or above, or by setting .option(“kafka.enable.idempotence”, “false”) while configuring your Structured Streaming writer.

The following are common options set while writing to Kafka:

Option

Value

Default value

Description

kafka.boostrap.servers

A comma-separated list of <host:port>

none

[Required] The Kafka bootstrap.servers configuration.

topic

STRING

not set

[Optional] Sets the topic for all rows to be written. This option overrides any topic column that exists in the data.

includeHeaders

BOOLEAN

false

[Optional] Whether to include the Kafka headers in the row.

See the Options page for the full list of available options.

Schema for Kafka writer

When writing data to Kafka, the provided DataFrame may include the following fields:

Column name

Required or optional

Type

key

optional

STRING or BINARY

value

required

STRING or BINARY

headers

optional

ARRAY

topic

optional (ignored if topic is set as writer option)

STRING

partition

optional

INT

Authentication

Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka. See Authentication.

Retrieve Kafka metrics

You can monitor how far a streaming query is lagging behind Kafka using the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. These report the average, maximum, and minimum offset lag across all subscribed topic partitions, relative to the latest offsets in Kafka. See Reading Metrics Interactively.

To estimate how much data the query has not yet consumed, use the estimatedTotalBytesBehindLatest metric. This metric estimates the total number of bytes remaining across all subscribed partitions based on the batches processed in the last 300 seconds. You can modify the time window used for this estimate by setting the bytesEstimateWindowLength option. For example, to set it to 10 minutes:

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

See Monitoring Structured Streaming queries on Databricks for more information.

Code example: Kafka to Delta

The following example demonstrates a complete workflow for continuously streaming data from Kafka to a Delta table. This pattern is ideal for near-real-time data ingestion workloads.

This example uses a fixed JSON schema. For other formats like Avro or Protobuf, use from_avro or from_protobuf. You can also integrate with a schema registry. See Example with Schema Registry.

Python
from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)

query.awaitTermination()