FAQ
Commonly asked questions about using Kafka with Databricks.
Why do I get an error that a Kafka option is not supported or not recognized?
A common mistake is forgetting the kafka. prefix when setting Kafka-native configuration options. All options passed directly to the Kafka client must be prefixed with kafka.:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Options specific to the Spark Kafka connector (like subscribe, startingOffsets, maxOffsetsPerTrigger) do not need the prefix. See Options for the complete list.
Why do I get an error about shaded Kafka classes?
Databricks requires using shaded Kafka classes (prefixed with kafkashaded. or shadedmskiam.). If you see errors like RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, you must use the shaded class names:
org.apache.kafka.*classes require thekafkashaded.prefix. For example:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModulesoftware.amazon.msk.*classes require theshadedmskiam.prefix. For example:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Why am I getting a TimeoutException when connecting to Kafka?
Common causes include:
- Network connectivity: The compute cluster cannot reach the Kafka brokers. Check firewall rules, security groups, and VPC configurations.
- Wrong bootstrap servers: Verify the
kafka.bootstrap.servershostname and port are correct. - DNS resolution: Ensure the Kafka broker hostnames can be resolved from the Databricks network.
- SSL/TLS issues: If using SSL, verify certificates are correctly configured.
For Private Link or VPC peering setups, ensure the correct network routes are in place.
Should I use batch or streaming mode for Kafka?
It depends on your use case:
- Streaming mode (
spark.readStream): Use when you need continuous data processing or low-latency ingestion. - Batch mode (
spark.read): Use for one-time data loads, backfills, or debugging. Requires bothstartingOffsetsandendingOffsets.
See Configure Structured Streaming trigger intervals for details on configuring trigger intervals such as AvailableNow, ProcessingTime, and Real-time mode.
Can I read from multiple Kafka topics in a single stream?
Yes, you can use:
subscribe: Provide a comma-separated list of topics, for example.option("subscribe", "topic1,topic2").subscribePattern: Use a Java regex pattern to match topic names, for example.option("subscribePattern", "topic-.*").
How do I use Kafka with Lakeflow Spark Declarative Pipelines?
Lakeflow Spark Declarative Pipelines provides native support for Kafka sources. You can define a streaming table that reads from Kafka:
- Python
- SQL
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
See Load data in pipelines for more details on streaming sources in Lakeflow Spark Declarative Pipelines.
How do I deserialize the Kafka key and value columns?
The key and value columns are returned as binary (BINARY type). Use DataFrame operations to deserialize them based on your data format:
- String data: Use
cast("string")to convert binary to string. - JSON data: Use
from_json()after casting to string. Seefrom_jsonfunction. - Avro data: Use
from_avro()to deserialize Avro-encoded data. See Read and write streaming Avro data. - Protocol buffers: Use
from_protobuf()to deserialize protobuf data. See Read and write protocol buffers.
Why am I getting an idempotent write error?
Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If your Kafka cluster uses version 2.8.0 or below with ACLs configured but without IDEMPOTENT_WRITE enabled, the write fails with: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolve this error by upgrading to Kafka version 2.8.0 or above, or by setting .option("kafka.enable.idempotence", "false") while configuring your Structured Streaming writer.
What is KAFKA_DATA_LOSS_ERROR and how do I resolve it?
This error occurs when the Kafka source detects that offsets stored in the checkpoint are no longer available in Kafka, typically because:
- The stream was paused longer than the Kafka retention period.
- Kafka topic data was deleted or the topic was recreated.
- Kafka broker experienced data loss.
To resolve:
- If data loss is acceptable: Set
.option("failOnDataLoss", "false")to allow the stream to continue from the earliest available offset. - If data loss is not acceptable: Reset the checkpoint and reprocess from
earliestoffsets, or restore the missing Kafka data.
See KAFKA_DATA_LOSS error condition for more information.
How do I control the rate at which data is read from Kafka?
Use the maxOffsetsPerTrigger option to limit the number of offsets (approximately the number of records) processed per micro-batch. This helps prevent large batches that could overwhelm downstream processing or cause memory issues when catching up on a backlog.
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Alternatively, use options like minPartitions or maxRecordsPerPartition to control how many Spark partitions are created for each batch.
How can I monitor how far behind my stream is from the latest Kafka offsets?
Use the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics available in the streaming query progress. These report how many offsets behind the latest available offset your stream is across all subscribed topic partitions. See Monitoring Structured Streaming queries on Databricks.
You can also use estimatedTotalBytesBehindLatest to estimate the total bytes of data that haven't been processed yet.
Why is my Kafka stream initialization slow?
Kafka streams require time to:
- Connect to the Kafka cluster and fetch metadata.
- Discover topic partitions.
- Fetch initial offsets.
For on-premises or remote Kafka clusters, network latency can significantly impact initialization time. If you're running triggered/scheduled pipelines with frequent restarts, consider using continuous streaming mode to avoid repeated initialization overhead.
Why isn't adding more Spark executors increasing my Kafka throughput?
Once the Kafka brokers become saturated, adding more Spark executors increases cost without increasing throughput.
Signs that Kafka is the bottleneck:
- Throughput plateaus despite adding more cores.
- Kafka broker CPU or network utilization is high.
- Spark tasks complete quickly but wait for new data.
To resolve this, scale your Kafka cluster by adding brokers or increasing partition counts to distribute load.
How can I optimize cost and compute utilization for Kafka streaming?
For micro-batch and AvailableNow modes:
- Right-size your cluster: Monitor metrics and set an appropriate fixed cluster size for peak load.
- Use
maxOffsetsPerTrigger: Limit batch sizes to control resource usage during load spikes. - Avoid autoscaling: Streaming jobs run continuously, and adding or removing nodes causes task rebalancing overhead.
- Reduce data skew: Skewed partitions cause some tasks to process significantly more data than others, leading to stragglers that slow down overall batch completion and waste compute resources on idle tasks. Use the
minPartitionsoption to split large Kafka partitions into smaller Spark partitions for more balanced processing.
For real-time mode, right-sizing is especially important because tasks can stay idle while waiting for data. Key considerations:
- Set
maxPartitionsso each task handles multiple Kafka partitions to reduce overhead. - Tune
spark.sql.shuffle.partitionsfor shuffle-heavy jobs.
See Real-time mode in Structured Streaming for guidance on sizing clusters for real-time mode.
Why is my stream returning no records even though data exists in the topic?
Common causes include:
- Wrong
startingOffsetssetting: The default value islatest, which only reads new data arriving after the stream starts. SetstartingOffsetstoearliestto read existing data. - Wrong topic name: Verify you're subscribing to the correct topic.
- Authentication issues: Your stream may have connected successfully but lacks permissions to read from the topic. Check your Kafka ACLs.
- Offset expiration: If your stream was stopped for a long time and the offsets in the checkpoint have expired (been deleted by Kafka retention), you may need to reset the checkpoint or adjust
failOnDataLoss.