認証
Databricks Kafka コネクタは、Kafka に接続するための複数の認証方法をサポートしています。この記事では、Databricks で最も一般的な認証方法のいくつかについて説明します。サポートされている認証方法の完全なリストは、 Kafka のドキュメントに記載されています。
Apache Kafkaの Google クラウド マネージド サービスに接続する
Apache Kafkaの Google クラウド マネージドサービスに対して認証するには、 Unity Catalogサービスの資格情報を使用します。 Databricksに認証設定を自動的に行わせることも、SASL/OAUTHBEARERオプションを手動で指定することもできます。
Unity Catalog認証情報を使用して接続します
Databricks 、 Databricks Runtime 18.0 以降のApache Kafkaの Google クラウド マネージド サービスへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Google クラウド マネージドサービス for Apache KafkaクラスターはVPCで実行されるため、 DatabricksコンピュートはマネージドKafkaクラスターに到達できるVPCで実行する必要があります。 サーバーレス コンピュートはDatabricksで管理されるコントロール プレーンで実行されるため、サポートされていません。 接続を有効にするには、 VPCピアリングまたはプライベート サービス 接続を構成します。 Google クラウドのドキュメントのVPCネットワーク ピアリング」または「プライベート サービス接続」を参照してください。
認証のために、 Unity Catalogサービス資格情報用に作成された Google クラウド サービス アカウントに、マネージドKafkaクラスターを含む Google クラウド プロジェクトでの次のロールを付与します。
roles/managedkafka.client
手順については、Google クラウドのドキュメントの「アクセスの付与、変更、取り消し」を参照してください。
次の例では、サービス資格情報を使用して Kafka をソースとして構成します。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
注 : Unity Catalog サービス資格情報を Kafka に提供する際には、次のオプションを指定しないでください。
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.login.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
SASLオプションで接続する
SASL/OAUTHBEARER オプションを直接指定することで、 databricks.serviceCredentialソース オプションを使用せずに、 Apache Kafkaの Google クラウド マネージドサービスに対する認証を行うことができます。 この方法は、Kafka SASL オプションを明示的に設定する必要がある場合にのみ使用してください (たとえば、特定のコールバック ハンドラーを使用する場合など)。
このアプローチには、Unity Catalog サービスの資格情報が必要です。
- サービス認証情報の Google クラウド サービス アカウントには、マネージドKafkaクラスターを含む Google クラウド プロジェクト内に
roles/managedkafka.client含まれている必要があります。 - Databricks 提供のログイン コールバック ハンドラー
org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandlerを使用します (Databricks Runtime に含まれていないcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlerではありません)。
Unity Catalog 外での Google サービス アカウントによる認証はサポートされていません。
次の例では、SASL オプションを直接指定して、Kafka をソースとして設定します。
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"startingOffsets" -> "earliest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.jaas.config" ->
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class" ->
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
// Required by the login callback handler:
"kafka.databricks.serviceCredential" -> "<service-credential-name>",
// Optional:
// "kafka.databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
注意 : これらの SASL オプションを指定するときはdatabricks.serviceCredentialを設定しないでください。databricks.serviceCredentialが設定されている場合、Databricks は Kafka 認証を自動的に構成し、 kafka.sasl.*オプションの指定を禁止します。
認証にはSASL/PLAINを使用します。
SASL/PLAIN(ユーザー名とパスワード)認証を使用してKafkaに接続するには、以下のオプションを設定してください。網掛けされたPlainLoginModuleクラス名を使用してください。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Databricksは、パスワードをコードに直接含めるのではなく、秘密情報として保存することを推奨しています。詳細については、 「シークレット管理」を参照してください。
SASL/SCRAMを使用して認証を行う
SASL/SCRAM(SCRAM-SHA-256またはSCRAM-SHA-512)を使用してKafkaに接続するには、以下のオプションを設定します。網掛けされたScramLoginModuleクラス名を使用してください。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Kafka クラスターが SCRAM-SHA-256 を使用するように構成されている場合は、 SCRAM-SHA-512 SCRAM-SHA-256に置き換えてください。
Databricksは、パスワードをコードに直接含めるのではなく、秘密情報として保存することを推奨しています。詳細については、 「シークレット管理」を参照してください。
SSLを使用してDatabricksをKafkaに接続する
KafkaへのSSL/TLS接続を有効にするには、 kafka.security.protocol SSLに設定し、 kafka.をプレフィックスとするトラストストアとキーストアの構成オプションを指定します。サーバー認証のみを必要とするSSL接続(一方向TLS)の場合、トラストストアが必要です。Kafkaブローカーがクライアントの認証も行う相互TLS(mTLS)の場合、トラストストアとキーストアの両方が必要です。
以下のSSL/TLSオプションが利用可能です。SSLプロパティの完全なリストについては、 Apache KafkaのSSL設定に関するドキュメント、およびConfluentドキュメントの「SSLによる暗号化と認証」を参照してください。
オプション | 説明 |
|---|---|
| TLS暗号化を有効にするには、 |
| 信頼できる認証局証明書を含むトラストストアファイルへのパス。 |
| 信頼ストアファイルのパスワード。 |
| トラストストアのファイル形式(デフォルト: |
| クライアント証明書と秘密鍵を含むキーストアファイルへのパス(mTLSに必要)。 |
| キーストアファイルのパスワード。 |
| キーストア内の秘密鍵のパスワード。 |
| ホスト名検証アルゴリズム。デフォルト値は |
SSLを使用する場合、Databricksは以下のことを推奨します。
- 証明書はUnity Catalogボリュームに保存してください。 ボリュームから読み取り権限を持つユーザーは、あなたのKafka証明書を使用できます。詳細については、 Unity Catalogボリュームとは?」を参照してください。
- 証明書のパスワードは、シークレットスコープ内のシークレットとして保存してください。詳細については、 「シークレットスコープの管理」を参照してください。
次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
DatabricksのシェーディングされたKafkaクラス名を使用する
Databricksは、独自のシェーディング版Kafkaクライアントライブラリをバンドルしています。認証設定オプションで参照するすべての Kafka クライアントクラス名は、標準のオープンソースクラス名ではなく、網掛けされたクラス名プレフィックスを使用する必要があります。これは、 kafka.sasl.jaas.config 、 kafka.sasl.login.callback.handler.class 、 kafka.sasl.client.callback.handler.classなどのオプションで参照されるすべてのクラスに適用されます。
網掛けされていないクラス名を使用すると、 RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDエラーが発生します。詳細はFAQをご覧ください。
潜在的なエラーの処理
-
認証時に例外
missing gcp_optionsが発生しましたこの例外は、コールバック ハンドラーがブートストラップ URL からスコープを推測できない場合にスローされます。
databricks.serviceCredential.scope手動で設定します。ほとんどのユースケースでは、https://www.googleapis.com/auth/cloud-platformに設定します。 -
解決可能なブートストラップ URL が見つかりません
これは、コンピュート クラスターがブートストラップ ホスト名を解決できないことを意味します。 VPC構成をチェックして、コンピュート クラスターが管理対象Kafkaクラスターに到達できることを確認します。 必要に応じて、 VPCピアリングまたはプライベート サービス接続を構成します。
-
権限の問題
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
roles/managedkafka.clientIAMバインディングがあることを確認してください。 - Kafka クラスターにトピックとサービス アカウントに対して適切な ACL が定義されていることを確認します。Google クラウドのドキュメントのKafka ACL によるアクセス制御」を参照してください。
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
-
レコードが返されませんでした
認証は成功したがデータが返されない場合:
- 正しいトピック名をサブスクライブしていることを確認します。
- デフォルトの
startingOffsetsはlatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、startingOffsetsをearliestに設定します。