Skip to main content

Connect to Apache Kafka

This page describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Databricks.

For more information about Kafka, see the Apache Kafka documentation.

Read data from Kafka

Use the kafka format to configure connections to Kafka. The following is an example for a streaming read:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

Databricks also supports batch reads from Kafka, as in the following example:

Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

For incremental batch loading, Databricks recommends using Kafka with Trigger.AvailableNow. See AvailableNow: Incremental batch processing.

In Databricks Runtime 13.3 LTS and above, Databricks also provides a SQL function for reading Kafka data. Streaming with SQL is supported only in Lakeflow Spark Declarative Pipelines or with streaming tables in Databricks SQL. See read_kafka table-valued function.

Configure Kafka Structured Streaming reader

For both batch and streaming queries, you must set the bootstrap servers for the Kafka source with the following option:

Key

Value

Description

kafka.bootstrap.servers

A comma-separated list of host:port

The Kafka cluster bootstrap servers

To set subscription topics, you must specify one of the following options:

Option

Value

Description

subscribe

A comma-separated list of topics.

The topic list to subscribe to.

subscribePattern

Java regex string.

The pattern used to subscribe to topic(s).

assign

JSON string {"topicA":[0,1],"topic":[2,4]}.

Specific topicPartitions to consume.

See Kafka for the full list of available options.

Schema for Kafka rows

The Kafka Structured Streaming reader returns rows with the following schema:

Column

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string") or from_avro) to explicitly deserialize the keys and values.

Write data to Kafka

The following is an example for a streaming write to Kafka:

Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks also supports batch write semantics to Kafka data sinks, as shown in the following example:

Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Configure the Kafka Structured Streaming writer

important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If a Kafka sink uses version 2.8.0 or below with ACLs configured, but without IDEMPOTENT_WRITE enabled, the write fails with the error message org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolve this error by upgrading to Kafka version 2.8.0 or above, or by setting .option(“kafka.enable.idempotence”, “false”) while configuring your Structured Streaming writer.

The following are common options for writes to Kafka:

Key

Value

Default value

Description

kafka.boostrap.servers

A comma-separated list of <host:port>

none

Required. The Kafka bootstrap.servers configuration.

topic

STRING

not set

Optional. Sets the topic for all rows to be written. This option overrides any topic column that exists in the data.

includeHeaders

BOOLEAN

false

Optional. Whether to include the Kafka headers in the row.

See Kafka sink for the full list of available options.

Schema for Kafka writer

When writing data to Kafka, the provided DataFrame may include the following fields:

Column name

Required or optional

Type

key

optional

STRING or BINARY

value

required

STRING or BINARY

headers

optional

ARRAY

topic

optional (ignored if topic is set as writer option)

STRING

partition

optional

INT

Authentication

Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka. See Authentication.

Retrieve Kafka metrics

To monitor lag behind Kafka for a streaming query, use the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. These metrics report the average, maximum, and minimum offset lag across all subscribed topic partitions, relative to the latest offsets in Kafka. See Reading Metrics Interactively.

note

In Databricks Runtime 17.1 and above, the latest Kafka offsets are fetched after each micro-batch completes. On topics that continuously receive data, backlog metrics may show small, persistent non-zero values. This is expected behavior and does not indicate that the stream is falling behind.

In Databricks Runtime 17.0 and below, the latest Kafka offsets are fetched at micro-batch start time. Backlog metrics may return 0 when streaming queries consistently consume all records available at the start of the micro-batch.

To estimate the remaining data for a query to read, use the estimatedTotalBytesBehindLatest metric. This metric estimates the total number of bytes remaining across all subscribed partitions based on the batches processed in the last 300 seconds. You can modify the time window used for this estimate by setting the bytesEstimateWindowLength option.

For example, to set the window length to 10 minutes:

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

See Monitoring Structured Streaming queries on Databricks for more information.

Example for Kafka to Delta Lake

The following example shows a complete workflow for continuously streaming data from Kafka to a Delta Lake table. You can use this approach for near real time data ingestion workloads.

This example uses a fixed JSON schema. For other formats like Avro or Protobuf, use from_avro or from_protobuf. You can also integrate with a schema registry. See Example with Schema Registry.

Python
from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)

query.awaitTermination()