Skip to main content

Authentication

The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.

Connect to Google Cloud Managed Service for Apache Kafka

To authenticate to Google Cloud Managed Service for Apache Kafka, use a Unity Catalog service credential. You can either let Databricks configure authentication automatically, or specify the SASL/OAUTHBEARER options manually.

Connect with Unity Catalog service credentials

Databricks supports Unity Catalog service credentials for authenticating access to Google Cloud Managed Service for Apache Kafka in Databricks Runtime 18.0 and above. Google Cloud Managed Service for Apache Kafka clusters runs in a VPC, so your Databricks compute must run in a VPC that can reach the managed Kafka cluster. Serverless compute isn't supported because it runs in the Databricks-managed control plane. To enable connectivity, configure VPC peering or Private Service Connect. See VPC Network Peering or Private Service Connect in the Google Cloud documentation.

For authentication, grant the Google Cloud service account created for your Unity Catalog service credential the following role in the Google Cloud project that contains your managed Kafka cluster:

  • roles/managedkafka.client

For instructions, see Granting, changing, and revoking access in the Google Cloud documentation.

The following example configures Kafka as a source using a service credential:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

Note: When you provide an Unity Catalog service credential to Kafka, do not specify the following options:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Connect with SASL options

You can authenticate to Google Cloud Managed Service for Apache Kafka without using the databricks.serviceCredential source option by specifying the SASL/OAUTHBEARER options directly. Use this approach only if you need to set Kafka SASL options explicitly (for example, to use a specific callback handler).

This approach requires a Unity Catalog service credential:

  • The service credential's Google Cloud service account must have roles/managedkafka.client in the Google Cloud project that contains your managed Kafka cluster.
  • Use the Databricks-provided login callback handler org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler (not com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, which isn't included in Databricks Runtime).

Authenticating with Google service accounts outside of Unity Catalog isn't supported.

The following example configures Kafka as a source by specifying SASL options directly:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Note: Don't set databricks.serviceCredential when you specify these SASL options. If databricks.serviceCredential is set, Databricks configures Kafka authentication automatically and disallows specifying kafka.sasl.* options.

Use SASL/PLAIN to authenticate

To connect to Kafka using SASL/PLAIN (username and password) authentication, configure the following options. Use the shaded PlainLoginModule class name:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.

Use SASL/SCRAM to authenticate

To connect to Kafka using SASL/SCRAM (SCRAM-SHA-256 or SCRAM-SHA-512), configure the following options. Use the shaded ScramLoginModule class name:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
note

Replace SCRAM-SHA-512 with SCRAM-SHA-256 if your Kafka cluster is configured to use SCRAM-SHA-256.

Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.

Use SSL to connect Databricks to Kafka

To enable SSL/TLS connections to Kafka, set kafka.security.protocol to SSL and provide the trust store and key store configuration options prefixed with kafka.. For SSL connections that require only server authentication (one-way TLS), you need a trust store. For mutual TLS (mTLS) where the Kafka broker also authenticates the client, you need both a trust store and a key store.

The following SSL/TLS options are available. For the full list of SSL properties, see the Apache Kafka SSL configuration documentation and Encryption and Authentication with SSL in the Confluent documentation.

Option

Description

kafka.security.protocol

Set to SSL to enable TLS encryption.

kafka.ssl.truststore.location

Path to the trust store file containing trusted CA certificates.

kafka.ssl.truststore.password

Password for the trust store file.

kafka.ssl.truststore.type

Trust store file format (default: JKS).

kafka.ssl.keystore.location

Path to the key store file containing the client certificate and private key (required for mTLS).

kafka.ssl.keystore.password

Password for the key store file.

kafka.ssl.key.password

Password for the private key in the key store.

kafka.ssl.endpoint.identification.algorithm

Hostname verification algorithm. Defaults to https. Set to an empty string to disable.

If you use SSL, Databricks recommends that you:

  • Store your certificates in a Unity Catalog volume. Users who have access to read from the volume can use your Kafka certificates. For more information, see What are Unity Catalog volumes?.
  • Store your certificate passwords as secrets in a secret scope. For more information, see Manage secret scopes.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Use Databricks shaded Kafka class names

Databricks bundles proprietary, shaded versions of the Kafka client libraries. All Kafka client class names that you reference in authentication configuration options must use the shaded class name prefix instead of the standard open-source class name. This applies to any class referenced in options like kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, and kafka.sasl.client.callback.handler.class.

Using unshaded class names results in a RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED error. See the FAQ for more details.

Handling potential errors

  • Exception missing gcp_options when authenticating

    This exception is thrown if the callback handler can't infer the scope from the bootstrap URL. Set databricks.serviceCredential.scope manually. For most use cases, set it to https://www.googleapis.com/auth/cloud-platform.

  • No resolvable bootstrap URLs found

    This means the compute cluster is unable to resolve the bootstrap hostname. Check your VPC configuration to ensure the compute cluster is able to reach the managed Kafka cluster. Configure VPC peering or Private Service Connect as needed.

  • Permission issues

    • Ensure that the service account has the roles/managedkafka.client IAM role binding on the project that the Kafka cluster belongs to.
    • Ensure that the Kafka cluster has the right ACLs defined for the topic and service accounts. See Access control with Kafka ACLs in the Google Cloud documentation.
  • No records returned

    If authentication succeeds but no data is returned:

    • Verify you're subscribing to the correct topic name.
    • The default startingOffsets is latest, which only reads new data. Set startingOffsets to earliest to read existing data.