Authentication
The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.
Amazon Managed Streaming for Kafka with IAM
You can connect to Amazon Managed Streaming for Kafka (MSK) from Databricks using IAM-based authentication. For MSK configuration instructions, see Amazon MSK configuration.
The following configurations are only required if you are using IAM to connect to MSK. You can also configure connections to MSK using options provided by the Apache Spark Kafka connector.
Connect with Unity Catalog service credentials
Since the release of Databricks Runtime 16.1, Databricks supports Unity Catalog service credentials for authenticating access to AWS Managed Streaming for Apache Kafka (MSK). Databricks recommends this approach, particularly when running Kafka streaming on shared clusters or serverless compute.
To use a Unity Catalog service credential for authentication, perform the following steps:
- Create a new Unity Catalog service credential. If you are not familiar with this process, see Create service credentials for instructions.
- Ensure that the IAM role attached to your service credential has the necessary permissions to connect to your MSK cluster.
- Provide the name of your Unity Catalog service credential as a source option in your Kafka configuration. Set the option
databricks.serviceCredentialto the name of your service credential.
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Note: When you use a Unity Catalog service credential to connect to Kafka, the following options are no longer needed:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.class
Connect with instance profiles
You can use an instance profile to authenticate to Amazon MSK clusters that have IAM authentication enabled. For detailed information on configuring instance profiles, see Instance profiles.
To connect to MSK using an instance profile, configure the following options:
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Connect with IAM users/roles
You can optionally configure your connection to MSK with an IAM user or IAM role instead of an instance profile. To do this, you must provide values for your AWS access key and secret key using the environmental variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. See Use a secret in a Spark configuration property or environment variable.
In addition, if you choose to configure your connection using an IAM role, you must modify the value provided to kafka.sasl.jaas.config to include the role ARN, as in the following example.
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Use SSL to connect Databricks to Kafka
To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, the trust store location would be specified with the property kafka.ssl.truststore.location.
If you will be using SSL, Databricks recommends that you:
- Store your certificates in a Unity Catalog volume. Users who have access to read from the volume will be able to use your Kafka certificates.
- Store your certificate passwords as secrets in a secret scope.
The following example uses object storage locations and Databricks secrets to enable an SSL connection:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Handling potential errors
-
IAM authentication failures
If you see
SaslException,Failed to construct kafka consumer, or authentication errors, verify:- The IAM role ARN in your
kafka.sasl.jaas.configis correct and properly formatted. - The IAM role has the necessary permissions to access your MSK cluster (for example,
kafka-cluster:Connect,kafka-cluster:ReadData). - For instance profiles, ensure the instance profile is attached to the cluster and has MSK permissions.
- For cross-account access, verify the trust relationship allows the Databricks account to assume the role.
- The IAM role ARN in your
-
Network connectivity issues
If you see
TimeoutExceptionor connection failures:- Verify the MSK cluster's security group allows inbound traffic from the Databricks compute security group on the Kafka ports (typically 9092 for PLAINTEXT, 9094 for SASL/SSL, or 9098 for IAM).
- Ensure VPC peering or PrivateLink is correctly configured between the Databricks VPC and the MSK VPC.
- Confirm the
kafka.bootstrap.servershostname and port are correct.
-
No records returned
If authentication succeeds but no data is returned:
- Verify you're subscribing to the correct topic name.
- The default
startingOffsetsislatest, which only reads new data. SetstartingOffsetstoearliestto read existing data. - Check that your IAM role has
kafka-cluster:ReadDatapermission for the topic.