認証
このページでは、Databricks 上の Kafka コネクタ向けの最も一般的な認証方法を紹介します。
サポートされている認証方法の完全なリストについては、Kafka のドキュメントを参照してください。
Apache Kafkaの Google クラウド マネージド サービスに接続する
Google Cloud の Apache Kafka 向けマネージドサービスに認証するには、Unity Catalog サービス資格情報を使用します。Databricks に認証を自動的に構成させることも、SASL および OAUTHBEARER オプションを手動で構成することもできます。
Unity Catalog認証情報を使用して接続します
Databricks Runtime 18.0 以降では、Databricks は Google Cloud マネージドサービス for Apache Kafka へのアクセスを認証するための Unity Catalog サービス資格情報をサポートしています。
Google クラウド マネージドサービス for Apache Kafka クラスターは VPC で実行されます。Databricks のコンピュートは、マネージド Kafka クラスターに到達可能な VPC で実行する必要があります。接続を有効にするには、VPC ピアリングまたは Private Service Connect を構成します。Google Cloudドキュメントの「VPC ネットワーク ピアリング」( )または「Private Service Connect」( )を参照してください。サーバレス コンピュート は、Databricks が管理するコントロールプレーンで実行されるため、サポートされていません。
認証には、Unity Catalogサービス資格情報用に作成されたGoogle Cloudサービスアカウントに、管理されたKafkaクラスターを含むGoogle Cloudプロジェクトで次のロールを付与します:
roles/managedkafka.client
手順については、Google クラウドのドキュメントの「アクセスの付与、変更、取り消し」を参照してください。
次の例では、サービス資格情報を使用して Kafka をソースとして構成します。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Unity Catalog サービス資格情報を使用して Kafka に接続する場合は、次のオプションを使用しないでください。
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.login.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
SASLオプションで接続する
SASL/OAUTHBEARER オプションを指定することで、databricks.serviceCredential ソースオプションを使用せずに Google クラウド マネージドサービス for Apache Kafka に認証できます。Kafka SASLオプションを明示的に設定する必要がある場合にのみ、このアプローチを使用してください。例えば、特定のコールバックハンドラーを使用するには。
SASL に接続するには、Unity Catalog サービスの資格情報を使用する必要があります。
- サービス認証情報の Google クラウド サービス アカウントには、マネージドKafkaクラスターを含む Google クラウド プロジェクト内に
roles/managedkafka.client含まれている必要があります。 - Databricks が提供するログインコールバックハンドラー
org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandlerを使用してください。Databricks Runtime に含まれていないcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlerを使用しないでください。
Googleサービスアカウントでの認証は、Unity Catalogなしではサポートされていません。
以下の例では、SASL を使用して Kafka をソースとして設定します。
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"startingOffsets" -> "earliest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.jaas.config" ->
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class" ->
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
// Required by the login callback handler:
"kafka.databricks.serviceCredential" -> "<service-credential-name>",
// Optional:
// "kafka.databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SASL オプションを指定する際には、databricks.serviceCredential は設定しないでください。databricks.serviceCredentialを設定すると、DatabricksはKafka認証を自動的に構成し、kafka.sasl.*オプションの指定を許可しません。
認証にはSASL/PLAINを使用します。
SASL/PLAIN(ユーザー名とパスワード)認証を使用してKafkaに接続するには、以下のオプションを設定してください。網掛けされたPlainLoginModuleクラス名を使用してください。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Databricks では、パスワードをコードに直接含めるのではなく、シークレットとして格納することをお勧めしています。詳細については、シークレット管理を参照してください。
SASL/SCRAMを使用して認証を行う
SASL/SCRAM(SCRAM-SHA-256またはSCRAM-SHA-512)を使用してKafkaに接続するには、以下のオプションを設定します。網掛けされたScramLoginModuleクラス名を使用してください。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Kafka クラスターが SCRAM-SHA-256 を使用するように構成されている場合は、 SCRAM-SHA-512 SCRAM-SHA-256に置き換えてください。
Databricks では、パスワードをコードに直接含めるのではなく、シークレットとして格納することをお勧めしています。詳細については、シークレット管理を参照してください。
SSLを使用してDatabricksをKafkaに接続する
Kafka への SSL/TLS 接続を有効にするには、kafka.security.protocol を SSL に設定し、kafka. を接頭辞として付加したトラストストアおよびキーストアの構成オプションを指定します。サーバー認証のみ(一方向 TLS)が必要なSSL接続で、トラストストアを使用する必要があります。Kafka ブローカーがクライアントも認証する相互 TLS (mTLS) の場合、トラストストアとキーストアの両方を使用する必要があります。
以下のSSL/TLSオプションが利用可能です。SSLプロパティの完全なリストについては、 Apache KafkaのSSL設定に関するドキュメント、およびConfluentドキュメントの「SSLによる暗号化と認証」を参照してください。
オプション | 説明 |
|---|---|
| TLS暗号化を有効にするには、 |
| 信頼できる認証局証明書を含むトラストストアファイルへのパス。 |
| 信頼ストアファイルのパスワード。 |
| トラストストアのファイル形式(デフォルト: |
| クライアント証明書と秘密鍵を含むキーストアファイルへのパス(mTLSに必要)。 |
| キーストアファイルのパスワード。 |
| キーストア内の秘密鍵のパスワード。 |
| ホスト名検証アルゴリズム。デフォルト値は |
SSLを使用する場合、Databricksは以下のことを推奨します。
- 証明書はUnity Catalogボリュームに保存してください。 ボリュームから読み取り権限を持つユーザーは、あなたのKafka証明書を使用できます。詳細については、 Unity Catalogボリュームとは?」を参照してください。
- 証明書のパスワードは、シークレットスコープ内のシークレットとして保存してください。詳細については、 「シークレットスコープの管理」を参照してください。
次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
DatabricksのシェーディングされたKafkaクラス名を使用する
Databricksは、独自のシェーディング版Kafkaクライアントライブラリをバンドルしています。認証設定オプションで参照するすべての Kafka クライアントクラス名は、標準のオープンソースクラス名ではなく、網掛けされたクラス名プレフィックスを使用する必要があります。これは、 kafka.sasl.jaas.config 、 kafka.sasl.login.callback.handler.class 、 kafka.sasl.client.callback.handler.classなどのオプションで参照されるすべてのクラスに適用されます。
非シェードのクラス名を使用すると、コードでRESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDエラーが発生します。詳しくは、FAQをご参照ください。
潜在的なエラーの処理
-
認証時に例外
missing gcp_optionsが発生しましたこの例外は、コールバック ハンドラーがブートストラップ URL からスコープを推測できない場合にスローされます。
databricks.serviceCredential.scope手動で設定します。ほとんどのユースケースでは、https://www.googleapis.com/auth/cloud-platformに設定します。 -
解決可能なブートストラップ URL が見つかりません
これは、コンピュート クラスターがブートストラップ ホスト名を解決できないことを意味します。 VPC構成をチェックして、コンピュート クラスターが管理対象Kafkaクラスターに到達できることを確認します。 必要に応じて、 VPCピアリングまたはプライベート サービス接続を構成します。
-
権限の問題
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
roles/managedkafka.clientIAMバインディングがあることを確認してください。 - Kafka クラスターにトピックとサービス アカウントに対して適切な ACL が定義されていることを確認します。Google クラウドのドキュメントのKafka ACL によるアクセス制御」を参照してください。
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
-
レコードが返されませんでした
認証は成功したがデータが返されない場合:
- 正しいトピック名をサブスクライブしていることを確認します。
- デフォルトの
startingOffsetsはlatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、startingOffsetsをearliestに設定します。