Skip to main content

Authentication

The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.

Connect to Amazon MSK with IAM

You can connect to Amazon Managed Streaming for Kafka (MSK) from Databricks using IAM-based authentication. For MSK configuration instructions, see Amazon MSK configuration.

note

The following configurations are only required if you are using IAM to connect to MSK. You can also configure connections to MSK using options provided by the Apache Spark Kafka connector.

Connect with Unity Catalog service credentials

Since the release of Databricks Runtime 16.1, Databricks supports Unity Catalog service credentials for authenticating access to AWS Managed Streaming for Apache Kafka (MSK). Databricks recommends this approach, particularly when running Kafka streaming on shared clusters or serverless compute.

To use a Unity Catalog service credential for authentication, perform the following steps:

  • Create a new Unity Catalog service credential. If you are not familiar with this process, see Create service credentials for instructions.
    • Ensure that the IAM role attached to your service credential has the necessary permissions to connect to your MSK cluster.
  • Provide the name of your Unity Catalog service credential as a source option in your Kafka configuration. Set the option databricks.serviceCredential to the name of your service credential.
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Note: When you use a Unity Catalog service credential to connect to Kafka, the following options are no longer needed:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class

Connect with instance profiles

You can use an instance profile to authenticate to Amazon MSK clusters that have IAM authentication enabled. For more information on configuring instance profiles, see Instance profiles.

To connect to MSK using an instance profile, configure the following options:

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Connect with IAM users/roles

You can optionally configure your connection to MSK with an IAM user or IAM role instead of an instance profile. To do this, you must provide values for your AWS access key and secret key using the environmental variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. See Use a secret in a Spark configuration property or environment variable.

In addition, if you choose to configure your connection using an IAM role, you must modify the value provided to kafka.sasl.jaas.config to include the role ARN, as in the following example.

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Use SASL/PLAIN to authenticate

To connect to Kafka using SASL/PLAIN (username and password) authentication, configure the following options. Use the shaded PlainLoginModule class name:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.

Use SASL/SCRAM to authenticate

To connect to Kafka using SASL/SCRAM (SCRAM-SHA-256 or SCRAM-SHA-512), configure the following options. Use the shaded ScramLoginModule class name:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
note

Replace SCRAM-SHA-512 with SCRAM-SHA-256 if your Kafka cluster is configured to use SCRAM-SHA-256.

Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.

Use SSL to connect Databricks to Kafka

To enable SSL/TLS connections to Kafka, set kafka.security.protocol to SSL and provide the trust store and key store configuration options prefixed with kafka.. For SSL connections that require only server authentication (one-way TLS), you need a trust store. For mutual TLS (mTLS) where the Kafka broker also authenticates the client, you need both a trust store and a key store.

The following SSL/TLS options are available. For the full list of SSL properties, see the Apache Kafka SSL configuration documentation and Encryption and Authentication with SSL in the Confluent documentation.

Option

Description

kafka.security.protocol

Set to SSL to enable TLS encryption.

kafka.ssl.truststore.location

Path to the trust store file containing trusted CA certificates.

kafka.ssl.truststore.password

Password for the trust store file.

kafka.ssl.truststore.type

Trust store file format (default: JKS).

kafka.ssl.keystore.location

Path to the key store file containing the client certificate and private key (required for mTLS).

kafka.ssl.keystore.password

Password for the key store file.

kafka.ssl.key.password

Password for the private key in the key store.

kafka.ssl.endpoint.identification.algorithm

Hostname verification algorithm. Defaults to https. Set to an empty string to disable.

If you use SSL, Databricks recommends that you:

  • Store your certificates in a Unity Catalog volume. Users who have access to read from the volume can use your Kafka certificates. For more information, see What are Unity Catalog volumes?.
  • Store your certificate passwords as secrets in a secret scope. For more information, see Manage secret scopes.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Use Databricks shaded Kafka class names

Databricks bundles proprietary, shaded versions of the Kafka client libraries. All Kafka client class names that you reference in authentication configuration options must use the shaded class name prefix instead of the standard open-source class name. This applies to any class referenced in options like kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, and kafka.sasl.client.callback.handler.class.

Using unshaded class names results in a RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED error. See the FAQ for more details.

Handling potential errors

  • IAM authentication failures

    If you see SaslException, Failed to construct kafka consumer, or authentication errors, verify:

    • The IAM role ARN in your kafka.sasl.jaas.config is correct and properly formatted.
    • The IAM role has the necessary permissions to access your MSK cluster (for example, kafka-cluster:Connect, kafka-cluster:ReadData).
    • For instance profiles, ensure the instance profile is attached to the cluster and has MSK permissions.
    • For cross-account access, verify the trust relationship allows the Databricks account to assume the role.
  • Network connectivity issues

    If you see TimeoutException or connection failures:

    • Verify the MSK cluster's security group allows inbound traffic from the Databricks compute security group on the Kafka ports (typically 9092 for PLAINTEXT, 9094 for SASL/SSL, or 9098 for IAM).
    • Ensure VPC peering or PrivateLink is correctly configured between the Databricks VPC and the MSK VPC.
    • Confirm the kafka.bootstrap.servers hostname and port are correct.
  • No records returned

    If authentication succeeds but no data is returned:

    • Verify you're subscribing to the correct topic name.
    • The default startingOffsets is latest, which only reads new data. Set startingOffsets to earliest to read existing data.
    • Check that your IAM role has kafka-cluster:ReadData permission for the topic.