Authentication
The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.
Connect to Amazon MSK with IAM
You can connect to Amazon Managed Streaming for Kafka (MSK) from Databricks using IAM-based authentication. For MSK configuration instructions, see Amazon MSK configuration.
The following configurations are only required if you are using IAM to connect to MSK. You can also configure connections to MSK using options provided by the Apache Spark Kafka connector.
Connect with Unity Catalog service credentials
Since the release of Databricks Runtime 16.1, Databricks supports Unity Catalog service credentials for authenticating access to AWS Managed Streaming for Apache Kafka (MSK). Databricks recommends this approach, particularly when running Kafka streaming on shared clusters or serverless compute.
To use a Unity Catalog service credential for authentication, perform the following steps:
- Create a new Unity Catalog service credential. If you are not familiar with this process, see Create service credentials for instructions.
- Ensure that the IAM role attached to your service credential has the necessary permissions to connect to your MSK cluster.
- Provide the name of your Unity Catalog service credential as a source option in your Kafka configuration. Set the option
databricks.serviceCredentialto the name of your service credential.
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Note: When you use a Unity Catalog service credential to connect to Kafka, the following options are no longer needed:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.class
Connect with instance profiles
You can use an instance profile to authenticate to Amazon MSK clusters that have IAM authentication enabled. For more information on configuring instance profiles, see Instance profiles.
To connect to MSK using an instance profile, configure the following options:
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Connect with IAM users/roles
You can optionally configure your connection to MSK with an IAM user or IAM role instead of an instance profile. To do this, you must provide values for your AWS access key and secret key using the environmental variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. See Use a secret in a Spark configuration property or environment variable.
In addition, if you choose to configure your connection using an IAM role, you must modify the value provided to kafka.sasl.jaas.config to include the role ARN, as in the following example.
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Use SASL/PLAIN to authenticate
To connect to Kafka using SASL/PLAIN (username and password) authentication, configure the following options. Use the shaded PlainLoginModule class name:
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.
Use SASL/SCRAM to authenticate
To connect to Kafka using SASL/SCRAM (SCRAM-SHA-256 or SCRAM-SHA-512), configure the following options. Use the shaded ScramLoginModule class name:
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Replace SCRAM-SHA-512 with SCRAM-SHA-256 if your Kafka cluster is configured to use SCRAM-SHA-256.
Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.
Use SSL to connect Databricks to Kafka
To enable SSL/TLS connections to Kafka, set kafka.security.protocol to SSL and provide the trust store and key store configuration options prefixed with kafka.. For SSL connections that require only server authentication (one-way TLS), you need a trust store. For mutual TLS (mTLS) where the Kafka broker also authenticates the client, you need both a trust store and a key store.
The following SSL/TLS options are available. For the full list of SSL properties, see the Apache Kafka SSL configuration documentation and Encryption and Authentication with SSL in the Confluent documentation.
Option | Description |
|---|---|
| Set to |
| Path to the trust store file containing trusted CA certificates. |
| Password for the trust store file. |
| Trust store file format (default: |
| Path to the key store file containing the client certificate and private key (required for mTLS). |
| Password for the key store file. |
| Password for the private key in the key store. |
| Hostname verification algorithm. Defaults to |
If you use SSL, Databricks recommends that you:
- Store your certificates in a Unity Catalog volume. Users who have access to read from the volume can use your Kafka certificates. For more information, see What are Unity Catalog volumes?.
- Store your certificate passwords as secrets in a secret scope. For more information, see Manage secret scopes.
The following example uses object storage locations and Databricks secrets to enable an SSL connection:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Use Databricks shaded Kafka class names
Databricks bundles proprietary, shaded versions of the Kafka client libraries. All Kafka client class names that you reference in authentication configuration options must use the shaded class name prefix instead of the standard open-source class name. This applies to any class referenced in options like kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, and kafka.sasl.client.callback.handler.class.
Using unshaded class names results in a RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED error. See the FAQ for more details.
Handling potential errors
-
IAM authentication failures
If you see
SaslException,Failed to construct kafka consumer, or authentication errors, verify:- The IAM role ARN in your
kafka.sasl.jaas.configis correct and properly formatted. - The IAM role has the necessary permissions to access your MSK cluster (for example,
kafka-cluster:Connect,kafka-cluster:ReadData). - For instance profiles, ensure the instance profile is attached to the cluster and has MSK permissions.
- For cross-account access, verify the trust relationship allows the Databricks account to assume the role.
- The IAM role ARN in your
-
Network connectivity issues
If you see
TimeoutExceptionor connection failures:- Verify the MSK cluster's security group allows inbound traffic from the Databricks compute security group on the Kafka ports (typically 9092 for PLAINTEXT, 9094 for SASL/SSL, or 9098 for IAM).
- Ensure VPC peering or PrivateLink is correctly configured between the Databricks VPC and the MSK VPC.
- Confirm the
kafka.bootstrap.servershostname and port are correct.
-
No records returned
If authentication succeeds but no data is returned:
- Verify you're subscribing to the correct topic name.
- The default
startingOffsetsislatest, which only reads new data. SetstartingOffsetstoearliestto read existing data. - Check that your IAM role has
kafka-cluster:ReadDatapermission for the topic.