Skip to main content

Options

This page describes configuration options for reading from and writing to Apache Kafka using Structured Streaming on Databricks.

The Databricks Kafka connector is built on top of the Apache Spark Kafka connector and supports all standard Kafka configuration options. Any option prefixed with kafka. is passed through directly to the underlying Kafka client. For example, .option("kafka.max.poll.records", "500") sets the Kafka consumer's max.poll.records property. See the Kafka configuration documentation for the full list of available Kafka properties.

For additional Structured Streaming source and sink options not listed on this page, see the Structured Streaming + Kafka Integration Guide.

Required options

The following option is required for both reading and writing:

Option

Value

Description

kafka.bootstrap.servers

A comma-separated list of host:port

The Kafka bootstrap.servers configuration. If you find there is no data from Kafka, check the broker address list first. If the broker address list is incorrect, there might not be any errors. This is because Kafka client assumes the brokers will become available eventually and in the event of network errors retry forever.

When reading from Kafka, you must also specify one of the following options to identify which topics to consume:

Option

Value

Description

subscribe

A comma-separated list of topics

The topic list to subscribe to.

subscribePattern

Java regex string

The pattern used to subscribe to topic(s).

assign

JSON string {"topicA":[0,1],"topicB":[2,4]}

Specific topic partitions to consume.

When writing to Kafka, you can optionally set the topic option to specify a destination topic for all rows. If not set, the DataFrame must include a topic column.

Common reader options

The following options are commonly used when reading from Kafka:

Option

Value

Default

Description

minPartitions

INT

none

Minimum number of partitions to read from Kafka. Normally, Spark creates one partition per Kafka topic-partition. Setting this higher splits large Kafka partitions into smaller Spark partitions for increased parallelism. Useful for handling data skew or peak loads. Note: Enabling this reinitializes Kafka consumers at each trigger, which may impact performance when using SSL.

maxRecordsPerPartition

LONG

none

Maximum number of records per Spark partition. When set, Spark splits Kafka partitions so each Spark partition has at most this many records. Can be used with minPartitions; when both are set, Spark uses whichever results in more partitions.

failOnDataLoss

BOOLEAN

true

Whether to fail the query when it's possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.

maxOffsetsPerTrigger

LONG

none

[Streaming only] Rate limit on maximum number of offsets processed per trigger interval. The total number of offsets is proportionally split across topic partitions.

For more advanced flow control, you can also use minOffsetsPerTrigger (minimum offsets before triggering) and maxTriggerDelay (maximum wait time, default 15m). See the Spark Kafka integration guide for details.

startingOffsets

earliest, latest, or JSON string

latest

Determines where to start reading when a query begins. Use earliest to read from the earliest available offsets, latest to read only new data after stream start, or a JSON string to specify a starting offset for each topic partition (for example, {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}}). In the JSON, -2 refers to earliest and -1 to latest.

For streaming queries, this only applies when a new query is started; resuming always picks up from where the query left off. Newly discovered partitions start at earliest.

Note: For batch queries, latest (either implicitly or by using -1 in JSON) is not allowed. To start from a specific timestamp instead, use startingTimestamp or startingOffsetsByTimestamp.

endingOffsets

latest or JSON string

latest

[Batch only] The end point when a batch query is ended. Use latest to read up to the most recent offsets, or a JSON string to specify an ending offset for each topic partition (for example, {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}}). In the JSON, -1 refers to latest; -2 (earlisest) is not allowed. To end at a specific timestamp instead, use endingTimestamp or endingOffsetsByTimestamp.

groupIdPrefix

STRING

spark-kafka-source (streaming) or spark-kafka-relation (batch)

Prefix for the auto-generated consumer group ID. The connector automatically generates a unique group.id for each query; this option customizes the prefix of that generated ID. Ignored if kafka.group.id is set.

kafka.group.id

STRING

none

Group ID to use while reading from Kafka. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.

  • Concurrently running queries (both, batch and streaming) with the same group ID are likely interfere with each other causing each query to read only part of the data.
  • This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer configuration session.timeout.ms to be very small.

includeHeaders

BOOLEAN

false

Whether to include Kafka message headers in the output.

bytesEstimateWindowLength

STRING

300s

[Streaming only] Time window used to estimate remaining bytes via the estimatedTotalBytesBehindLatest metric. Accepts duration strings like 10m (10 minutes) or 600s (600 seconds). See Retrieve Kafka metrics.

Common writer options

The following options are commonly used when writing to Kafka:

Option

Value

Default

Description

topic

STRING

none

Sets the topic for all rows. This overrides any topic column in the data.

includeHeaders

BOOLEAN

false

Whether to include Kafka headers in the row.

important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If your Kafka sink uses version 2.8.0 or below with ACLs configured but without IDEMPOTENT_WRITE enabled, writes will fail. Resolve this by upgrading to Kafka 2.8.0 or above, or by setting .option("kafka.enable.idempotence", "false").

Authentication options

Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka.

Databricks recommends using Unity Catalog service credentials for authentication to cloud-managed Kafka services:

Option

Value

Description

databricks.serviceCredential

STRING

The name of a Unity Catalog service credential for authenticating to cloud-managed Kafka services (AWS MSK, Azure Event Hubs, or Google Cloud Managed Kafka). Available in Databricks Runtime 16.1 and above.

databricks.serviceCredential.scope

STRING

The OAuth scope for the service credential. Only set this if Databricks cannot automatically infer the scope for your Kafka service.

When you use a Unity Catalog service credential, you do not need to specify SASL/SSL options like kafka.sasl.mechanism, kafka.sasl.jaas.config, or kafka.security.protocol.

Common SASL/SSL options include:

Option

Value

Description

kafka.security.protocol

STRING

Protocol used to communicate with brokers (for example, SASL_SSL, SSL, PLAINTEXT).

kafka.sasl.mechanism

STRING

SASL mechanism (for example, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM).

kafka.sasl.jaas.config

STRING

JAAS login configuration string.

kafka.sasl.login.callback.handler.class

STRING

Fully qualified class name of a login callback handler for SASL authentication.

kafka.sasl.client.callback.handler.class

STRING

Fully qualified class name of a client callback handler for SASL authentication.

kafka.ssl.truststore.location

STRING

Location of the SSL trust store file.

kafka.ssl.truststore.password

STRING

Password for the SSL trust store file.

kafka.ssl.keystore.location

STRING

Location of the SSL key store file.

kafka.ssl.keystore.password

STRING

Password for the SSL key store file.

For complete authentication setup instructions, see Authentication.

Additional resources