Options
This page describes configuration options for reading from and writing to Apache Kafka using Structured Streaming on Databricks.
The Databricks Kafka connector is built on top of the Apache Spark Kafka connector and supports all standard Kafka configuration options. Any option prefixed with kafka. is passed through directly to the underlying Kafka client. For example, .option("kafka.max.poll.records", "500") sets the Kafka consumer's max.poll.records property. See the Kafka configuration documentation for the full list of available Kafka properties.
For additional Structured Streaming source and sink options not listed on this page, see the Structured Streaming + Kafka Integration Guide.
Required options
The following option is required for both reading and writing:
Option | Value | Description |
|---|---|---|
| A comma-separated list of host:port | The Kafka |
When reading from Kafka, you must also specify one of the following options to identify which topics to consume:
Option | Value | Description |
|---|---|---|
| A comma-separated list of topics | The topic list to subscribe to. |
| Java regex string | The pattern used to subscribe to topic(s). |
| JSON string | Specific topic partitions to consume. |
When writing to Kafka, you can optionally set the topic option to specify a destination topic for all rows. If not set, the DataFrame must include a topic column.
Common reader options
The following options are commonly used when reading from Kafka:
Option | Value | Default | Description |
|---|---|---|---|
|
| none | Minimum number of partitions to read from Kafka. Normally, Spark creates one partition per Kafka topic-partition. Setting this higher splits large Kafka partitions into smaller Spark partitions for increased parallelism. Useful for handling data skew or peak loads. Note: Enabling this reinitializes Kafka consumers at each trigger, which may impact performance when using SSL. |
|
| none | Maximum number of records per Spark partition. When set, Spark splits Kafka partitions so each Spark partition has at most this many records. Can be used with |
|
|
| Whether to fail the query when it's possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to |
|
| none | [Streaming only] Rate limit on maximum number of offsets processed per trigger interval. The total number of offsets is proportionally split across topic partitions. For more advanced flow control, you can also use |
|
|
| Determines where to start reading when a query begins. Use For streaming queries, this only applies when a new query is started; resuming always picks up from where the query left off. Newly discovered partitions start at Note: For batch queries, |
|
|
| [Batch only] The end point when a batch query is ended. Use |
|
|
| Prefix for the auto-generated consumer group ID. The connector automatically generates a unique |
|
| none | Group ID to use while reading from Kafka. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.
|
|
|
| Whether to include Kafka message headers in the output. |
|
|
| [Streaming only] Time window used to estimate remaining bytes via the |
Common writer options
The following options are commonly used when writing to Kafka:
Option | Value | Default | Description |
|---|---|---|---|
|
| none | Sets the topic for all rows. This overrides any |
|
|
| Whether to include Kafka headers in the row. |
Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If your Kafka sink uses version 2.8.0 or below with ACLs configured but without IDEMPOTENT_WRITE enabled, writes will fail. Resolve this by upgrading to Kafka 2.8.0 or above, or by setting .option("kafka.enable.idempotence", "false").
Authentication options
Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka.
Databricks recommends using Unity Catalog service credentials for authentication to cloud-managed Kafka services:
Option | Value | Description |
|---|---|---|
|
| The name of a Unity Catalog service credential for authenticating to cloud-managed Kafka services (AWS MSK, Azure Event Hubs, or Google Cloud Managed Kafka). Available in Databricks Runtime 16.1 and above. |
|
| The OAuth scope for the service credential. Only set this if Databricks cannot automatically infer the scope for your Kafka service. |
When you use a Unity Catalog service credential, you do not need to specify SASL/SSL options like kafka.sasl.mechanism, kafka.sasl.jaas.config, or kafka.security.protocol.
Common SASL/SSL options include:
Option | Value | Description |
|---|---|---|
|
| Protocol used to communicate with brokers (for example, |
|
| SASL mechanism (for example, |
|
| JAAS login configuration string. |
|
| Fully qualified class name of a login callback handler for SASL authentication. |
|
| Fully qualified class name of a client callback handler for SASL authentication. |
|
| Location of the SSL trust store file. |
|
| Password for the SSL trust store file. |
|
| Location of the SSL key store file. |
|
| Password for the SSL key store file. |
For complete authentication setup instructions, see Authentication.
Additional resources
- Structured Streaming + Kafka Integration Guide (Apache Spark documentation)
- Apache Kafka configurations