Authentication
The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.
Google Cloud Managed Service for Apache Kafka
Connect with Unity Catalog service credentials
Databricks supports Unity Catalog service credentials for authenticating access to Google Cloud Managed Service for Apache Kafka in Databricks Runtime 18.0 and above. Google Cloud Managed Service for Apache Kafka clusters runs in a VPC, so your Databricks compute must run in a VPC that can reach the managed Kafka cluster. Serverless compute isn't supported because it runs in the Databricks-managed control plane. To enable connectivity, configure VPC peering or Private Service Connect. See VPC Network Peering or Private Service Connect in the Google Cloud documentation.
For authentication, grant the Google Cloud service account created for your Unity Catalog service credential the following role in the Google Cloud project that contains your managed Kafka cluster:
roles/managedkafka.client
For instructions, see Granting, changing, and revoking access in the Google Cloud documentation.
The following example configures Kafka as a source using a service credential:
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Note: When you provide an Unity Catalog service credential to Kafka, do not specify the following options:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.login.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Connect with SASL options
You can authenticate to Google Cloud Managed Service for Apache Kafka without using the databricks.serviceCredential source option by specifying the SASL/OAUTHBEARER options directly. Use this approach only if you need to set Kafka SASL options explicitly (for example, to use a specific callback handler).
This approach requires a Unity Catalog service credential:
- The service credential's Google Cloud service account must have
roles/managedkafka.clientin the Google Cloud project that contains your managed Kafka cluster. - Use the Databricks-provided login callback handler
org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler(notcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, which isn't included in Databricks Runtime).
Authenticating with Google service accounts outside of Unity Catalog isn't supported.
The following example configures Kafka as a source by specifying SASL options directly:
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"startingOffsets" -> "earliest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.jaas.config" ->
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class" ->
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
// Required by the login callback handler:
"kafka.databricks.serviceCredential" -> "<service-credential-name>",
// Optional:
// "kafka.databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
Note: Don't set databricks.serviceCredential when you specify these SASL options. If databricks.serviceCredential is set, Databricks configures Kafka authentication automatically and disallows specifying kafka.sasl.* options.
Use SSL to connect Databricks to Kafka
To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, the trust store location would be specified with the property kafka.ssl.truststore.location.
If you will be using SSL, Databricks recommends that you:
- Store your certificates in a Unity Catalog volume. Users who have access to read from the volume will be able to use your Kafka certificates.
- Store your certificate passwords as secrets in a secret scope.
The following example uses object storage locations and Databricks secrets to enable an SSL connection:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Handling potential errors
-
Exception
missing gcp_optionswhen authenticatingThis exception is thrown if the callback handler can't infer the scope from the bootstrap URL. Set
databricks.serviceCredential.scopemanually. For most use cases, set it tohttps://www.googleapis.com/auth/cloud-platform. -
No resolvable bootstrap URLs found
This means the compute cluster is unable to resolve the bootstrap hostname. Check your VPC configuration to ensure the compute cluster is able to reach the managed Kafka cluster. Configure VPC peering or Private Service Connect as needed.
-
Permission issues
- Ensure that the service account has the
roles/managedkafka.clientIAM role binding on the project that the Kafka cluster belongs to. - Ensure that the Kafka cluster has the right ACLs defined for the topic and service accounts. See Access control with Kafka ACLs in the Google Cloud documentation.
- Ensure that the service account has the
-
No records returned
If authentication succeeds but no data is returned:
- Verify you're subscribing to the correct topic name.
- The default
startingOffsetsislatest, which only reads new data. SetstartingOffsetstoearliestto read existing data.