Authentication
The Databricks Kafka connector supports multiple authentication methods for connecting to Kafka. This article covers some of the most common authentication methods on Databricks. The full list of supported authentication methods can be found in the Kafka documentation.
Connect to Google Cloud Managed Service for Apache Kafka
To authenticate to Google Cloud Managed Service for Apache Kafka, use a Unity Catalog service credential. You can either let Databricks configure authentication automatically, or specify the SASL/OAUTHBEARER options manually.
Connect with Unity Catalog service credentials
Databricks supports Unity Catalog service credentials for authenticating access to Google Cloud Managed Service for Apache Kafka in Databricks Runtime 18.0 and above. Google Cloud Managed Service for Apache Kafka clusters runs in a VPC, so your Databricks compute must run in a VPC that can reach the managed Kafka cluster. Serverless compute isn't supported because it runs in the Databricks-managed control plane. To enable connectivity, configure VPC peering or Private Service Connect. See VPC Network Peering or Private Service Connect in the Google Cloud documentation.
For authentication, grant the Google Cloud service account created for your Unity Catalog service credential the following role in the Google Cloud project that contains your managed Kafka cluster:
roles/managedkafka.client
For instructions, see Granting, changing, and revoking access in the Google Cloud documentation.
The following example configures Kafka as a source using a service credential:
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Note: When you provide an Unity Catalog service credential to Kafka, do not specify the following options:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.login.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Connect with SASL options
You can authenticate to Google Cloud Managed Service for Apache Kafka without using the databricks.serviceCredential source option by specifying the SASL/OAUTHBEARER options directly. Use this approach only if you need to set Kafka SASL options explicitly (for example, to use a specific callback handler).
This approach requires a Unity Catalog service credential:
- The service credential's Google Cloud service account must have
roles/managedkafka.clientin the Google Cloud project that contains your managed Kafka cluster. - Use the Databricks-provided login callback handler
org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler(notcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, which isn't included in Databricks Runtime).
Authenticating with Google service accounts outside of Unity Catalog isn't supported.
The following example configures Kafka as a source by specifying SASL options directly:
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"startingOffsets" -> "earliest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.jaas.config" ->
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class" ->
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
// Required by the login callback handler:
"kafka.databricks.serviceCredential" -> "<service-credential-name>",
// Optional:
// "kafka.databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
Note: Don't set databricks.serviceCredential when you specify these SASL options. If databricks.serviceCredential is set, Databricks configures Kafka authentication automatically and disallows specifying kafka.sasl.* options.
Use SASL/PLAIN to authenticate
To connect to Kafka using SASL/PLAIN (username and password) authentication, configure the following options. Use the shaded PlainLoginModule class name:
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.
Use SASL/SCRAM to authenticate
To connect to Kafka using SASL/SCRAM (SCRAM-SHA-256 or SCRAM-SHA-512), configure the following options. Use the shaded ScramLoginModule class name:
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Replace SCRAM-SHA-512 with SCRAM-SHA-256 if your Kafka cluster is configured to use SCRAM-SHA-256.
Databricks recommends storing your password as a secret rather than including it directly in your code. For more information, see Secret management.
Use SSL to connect Databricks to Kafka
To enable SSL/TLS connections to Kafka, set kafka.security.protocol to SSL and provide the trust store and key store configuration options prefixed with kafka.. For SSL connections that require only server authentication (one-way TLS), you need a trust store. For mutual TLS (mTLS) where the Kafka broker also authenticates the client, you need both a trust store and a key store.
The following SSL/TLS options are available. For the full list of SSL properties, see the Apache Kafka SSL configuration documentation and Encryption and Authentication with SSL in the Confluent documentation.
Option | Description |
|---|---|
| Set to |
| Path to the trust store file containing trusted CA certificates. |
| Password for the trust store file. |
| Trust store file format (default: |
| Path to the key store file containing the client certificate and private key (required for mTLS). |
| Password for the key store file. |
| Password for the private key in the key store. |
| Hostname verification algorithm. Defaults to |
If you use SSL, Databricks recommends that you:
- Store your certificates in a Unity Catalog volume. Users who have access to read from the volume can use your Kafka certificates. For more information, see What are Unity Catalog volumes?.
- Store your certificate passwords as secrets in a secret scope. For more information, see Manage secret scopes.
The following example uses object storage locations and Databricks secrets to enable an SSL connection:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Use Databricks shaded Kafka class names
Databricks bundles proprietary, shaded versions of the Kafka client libraries. All Kafka client class names that you reference in authentication configuration options must use the shaded class name prefix instead of the standard open-source class name. This applies to any class referenced in options like kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, and kafka.sasl.client.callback.handler.class.
Using unshaded class names results in a RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED error. See the FAQ for more details.
Handling potential errors
-
Exception
missing gcp_optionswhen authenticatingThis exception is thrown if the callback handler can't infer the scope from the bootstrap URL. Set
databricks.serviceCredential.scopemanually. For most use cases, set it tohttps://www.googleapis.com/auth/cloud-platform. -
No resolvable bootstrap URLs found
This means the compute cluster is unable to resolve the bootstrap hostname. Check your VPC configuration to ensure the compute cluster is able to reach the managed Kafka cluster. Configure VPC peering or Private Service Connect as needed.
-
Permission issues
- Ensure that the service account has the
roles/managedkafka.clientIAM role binding on the project that the Kafka cluster belongs to. - Ensure that the Kafka cluster has the right ACLs defined for the topic and service accounts. See Access control with Kafka ACLs in the Google Cloud documentation.
- Ensure that the service account has the
-
No records returned
If authentication succeeds but no data is returned:
- Verify you're subscribing to the correct topic name.
- The default
startingOffsetsislatest, which only reads new data. SetstartingOffsetstoearliestto read existing data.