Skip to main content

Authentication

The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.

Google Cloud Managed Service for Apache Kafka

Connect with Unity Catalog service credentials

Databricks supports Unity Catalog service credentials for authenticating access to Google Cloud Managed Service for Apache Kafka in Databricks Runtime 18.0 and above. Google Cloud Managed Service for Apache Kafka clusters runs in a VPC, so your Databricks compute must run in a VPC that can reach the managed Kafka cluster. Serverless compute isn't supported because it runs in the Databricks-managed control plane. To enable connectivity, configure VPC peering or Private Service Connect. See VPC Network Peering or Private Service Connect in the Google Cloud documentation.

For authentication, grant the Google Cloud service account created for your Unity Catalog service credential the following role in the Google Cloud project that contains your managed Kafka cluster:

  • roles/managedkafka.client

For instructions, see Granting, changing, and revoking access in the Google Cloud documentation.

The following example configures Kafka as a source using a service credential:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

Note: When you provide an Unity Catalog service credential to Kafka, do not specify the following options:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Connect with SASL options

You can authenticate to Google Cloud Managed Service for Apache Kafka without using the databricks.serviceCredential source option by specifying the SASL/OAUTHBEARER options directly. Use this approach only if you need to set Kafka SASL options explicitly (for example, to use a specific callback handler).

This approach requires a Unity Catalog service credential:

  • The service credential's Google Cloud service account must have roles/managedkafka.client in the Google Cloud project that contains your managed Kafka cluster.
  • Use the Databricks-provided login callback handler org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler (not com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, which isn't included in Databricks Runtime).

Authenticating with Google service accounts outside of Unity Catalog isn't supported.

The following example configures Kafka as a source by specifying SASL options directly:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Note: Don't set databricks.serviceCredential when you specify these SASL options. If databricks.serviceCredential is set, Databricks configures Kafka authentication automatically and disallows specifying kafka.sasl.* options.

Use SSL to connect Databricks to Kafka

To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, the trust store location would be specified with the property kafka.ssl.truststore.location.

If you will be using SSL, Databricks recommends that you:

  • Store your certificates in a Unity Catalog volume. Users who have access to read from the volume will be able to use your Kafka certificates.
  • Store your certificate passwords as secrets in a secret scope.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Handling potential errors

  • Exception missing gcp_options when authenticating

    This exception is thrown if the callback handler can't infer the scope from the bootstrap URL. Set databricks.serviceCredential.scope manually. For most use cases, set it to https://www.googleapis.com/auth/cloud-platform.

  • No resolvable bootstrap URLs found

    This means the compute cluster is unable to resolve the bootstrap hostname. Check your VPC configuration to ensure the compute cluster is able to reach the managed Kafka cluster. Configure VPC peering or Private Service Connect as needed.

  • Permission issues

    • Ensure that the service account has the roles/managedkafka.client IAM role binding on the project that the Kafka cluster belongs to.
    • Ensure that the Kafka cluster has the right ACLs defined for the topic and service accounts. See Access control with Kafka ACLs in the Google Cloud documentation.
  • No records returned

    If authentication succeeds but no data is returned:

    • Verify you're subscribing to the correct topic name.
    • The default startingOffsets is latest, which only reads new data. Set startingOffsets to earliest to read existing data.