Skip to main content

Authentication

This page shows the most common authentication methods for the Kafka connector on Databricks.

The full list of supported authentication methods can be found in the Kafka documentation.

Connect to Amazon MSK with IAM

You can connect to Amazon Managed Streaming for Kafka (MSK) from Databricks using IAM-based authentication. For MSK configuration instructions, see Amazon MSK configuration.

If you use IAM to connect to MSK, you must use one of the connection methods below. Alternatively, you can configure connections to MSK using the options provided by the Apache Spark Kafka connector.

Connect with Unity Catalog service credentials

In Databricks Runtime 16.1 and above, Databricks supports Unity Catalog service credentials for authenticating access to AWS Managed Streaming for Apache Kafka (MSK). Databricks recommends this authentication method if you use shared clusters or serverless compute.

To use a Unity Catalog service credential for authentication, do the following:

  • Create a new Unity Catalog service credential. See Create service credentials.
    • Confirm that the IAM role attached to your service credential has the correct permissions to connect to your MSK cluster.
  • Set the source option databricks.serviceCredential to the name of your Unity Catalog service credential.
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
note

When you use a Unity Catalog service credential to connect to Kafka, don't use the following options:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class

Connect with instance profiles

You can use an instance profile to authenticate to Amazon MSK clusters that have IAM authentication enabled. See Instance profiles.

To connect to MSK using an instance profile, configure the following options:

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Connect with IAM users and IAM roles

Optionally, you can configure your connection to MSK with an IAM user or IAM role instead of an instance profile. You must provide values for your AWS access key and secret key using the environmental variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. See Use a secret in a Spark configuration property or environment variable.

To configure your connection using an IAM role, you must modify the value for kafka.sasl.jaas.config to include the role ARN, as in the following example:

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Use SASL/PLAIN to authenticate

To connect to Kafka using SASL/PLAIN (username and password) authentication, configure the following options. Use the shaded PlainLoginModule class name:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Databricks recommends that you store your password as a secret rather than including it directly in your code. For more information, see Secret management.

Use SASL/SCRAM to authenticate

To connect to Kafka using SASL/SCRAM (SCRAM-SHA-256 or SCRAM-SHA-512), configure the following options. Use the shaded ScramLoginModule class name:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
note

Replace SCRAM-SHA-512 with SCRAM-SHA-256 if your Kafka cluster is configured to use SCRAM-SHA-256.

Databricks recommends that you store your password as a secret rather than including it directly in your code. For more information, see Secret management.

Use SSL to connect Databricks to Kafka

To enable SSL/TLS connections to Kafka, set kafka.security.protocol to SSL and provide the trust store and key store configuration options prefixed with kafka.. For SSL connections that require only server authentication (one-way TLS), you must use a trust store. For mutual TLS (mTLS) where the Kafka broker also authenticates the client, you must use both a trust store and a key store.

The following SSL/TLS options are available. For the full list of SSL properties, see the Apache Kafka SSL configuration documentation and Encryption and Authentication with SSL in the Confluent documentation.

Option

Description

kafka.security.protocol

Set to SSL to enable TLS encryption.

kafka.ssl.truststore.location

Path to the trust store file containing trusted CA certificates.

kafka.ssl.truststore.password

Password for the trust store file.

kafka.ssl.truststore.type

Trust store file format (default: JKS).

kafka.ssl.keystore.location

Path to the key store file containing the client certificate and private key (required for mTLS).

kafka.ssl.keystore.password

Password for the key store file.

kafka.ssl.key.password

Password for the private key in the key store.

kafka.ssl.endpoint.identification.algorithm

Hostname verification algorithm. Defaults to https. Set to an empty string to disable.

If you use SSL, Databricks recommends that you:

  • Store your certificates in a Unity Catalog volume. Users who have access to read from the volume can use your Kafka certificates. For more information, see What are Unity Catalog volumes?.
  • Store your certificate passwords as secrets in a secret scope. For more information, see Manage secret scopes.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Use Databricks shaded Kafka class names

Databricks bundles proprietary, shaded versions of the Kafka client libraries. All Kafka client class names that you reference in authentication configuration options must use the shaded class name prefix instead of the standard open-source class name. This applies to any class referenced in options like kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, and kafka.sasl.client.callback.handler.class.

If you use unshaded class names, your code raises a RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED error. See the FAQ for more details.

Handling potential errors

  • IAM authentication failures

    If you see SaslException, Failed to construct kafka consumer, or authentication errors, verify:

    • The IAM role ARN in your kafka.sasl.jaas.config is correct and properly formatted.
    • The IAM role has the necessary permissions to access your MSK cluster (for example, kafka-cluster:Connect, kafka-cluster:ReadData).
    • For instance profiles, ensure the instance profile is attached to the cluster and has MSK permissions.
    • For cross-account access, verify the trust relationship allows the Databricks account to assume the role.
  • Network connectivity issues

    If you see TimeoutException or connection failures:

    • Verify the MSK cluster's security group allows inbound traffic from the Databricks compute security group on the Kafka ports (typically 9092 for PLAINTEXT, 9094 for SASL/SSL, or 9098 for IAM).
    • Ensure VPC peering or PrivateLink is correctly configured between the Databricks VPC and the MSK VPC.
    • Confirm the kafka.bootstrap.servers hostname and port are correct.
  • No records returned

    If authentication succeeds but no data is returned:

    • Verify you're subscribing to the correct topic name.
    • The default startingOffsets is latest, which only reads new data. Set startingOffsets to earliest to read existing data.
    • Check that your IAM role has kafka-cluster:ReadData permission for the topic.