メインコンテンツまでスキップ

認証

Databricks Kafka コネクタは、Kafka に接続するための複数の認証方法をサポートしています。この記事では、Databricks で最も一般的な認証方法のいくつかについて説明します。サポートされている認証方法の完全なリストは、 Kafka のドキュメントに記載されています。

Apache Kafkaの Google クラウド マネージド サービスに接続する

Apache Kafkaの Google クラウド マネージドサービスに対して認証するには、 Unity Catalogサービスの資格情報を使用します。 Databricksに認証設定を自動的に行わせることも、SASL/OAUTHBEARERオプションを手動で指定することもできます。

Unity Catalog認証情報を使用して接続します

Databricks 、 Databricks Runtime 18.0 以降のApache Kafkaの Google クラウド マネージド サービスへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Google クラウド マネージドサービス for Apache KafkaクラスターはVPCで実行されるため、 DatabricksコンピュートはマネージドKafkaクラスターに到達できるVPCで実行する必要があります。 サーバーレス コンピュートはDatabricksで管理されるコントロール プレーンで実行されるため、サポートされていません。 接続を有効にするには、 VPCピアリングまたはプライベート サービス 接続を構成します。 Google クラウドのドキュメントのVPCネットワーク ピアリング」または「プライベート サービス接続」を参照してください。

認証のために、 Unity Catalogサービス資格情報用に作成された Google クラウド サービス アカウントに、マネージドKafkaクラスターを含む Google クラウド プロジェクトでの次のロールを付与します。

  • roles/managedkafka.client

手順については、Google クラウドのドキュメントの「アクセスの付与、変更、取り消し」を参照してください。

次の例では、サービス資格情報を使用して Kafka をソースとして構成します。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

: Unity Catalog サービス資格情報を Kafka に提供する際には、次のオプションを指定しないでください。

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

SASLオプションで接続する

SASL/OAUTHBEARER オプションを直接指定することで、 databricks.serviceCredentialソース オプションを使用せずに、 Apache Kafkaの Google クラウド マネージドサービスに対する認証を行うことができます。 この方法は、Kafka SASL オプションを明示的に設定する必要がある場合にのみ使用してください (たとえば、特定のコールバック ハンドラーを使用する場合など)。

このアプローチには、Unity Catalog サービスの資格情報が必要です。

  • サービス認証情報の Google クラウド サービス アカウントには、マネージドKafkaクラスターを含む Google クラウド プロジェクト内にroles/managedkafka.client含まれている必要があります。
  • Databricks 提供のログイン コールバック ハンドラーorg.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandlerを使用します (Databricks Runtime に含まれていないcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlerではありません)。

Unity Catalog 外での Google サービス アカウントによる認証はサポートされていません。

次の例では、SASL オプションを直接指定して、Kafka をソースとして設定します。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

注意 : これらの SASL オプションを指定するときはdatabricks.serviceCredentialを設定しないでください。databricks.serviceCredentialが設定されている場合、Databricks は Kafka 認証を自動的に構成し、 kafka.sasl.*オプションの指定を禁止します。

認証にはSASL/PLAINを使用します。

SASL/PLAIN(ユーザー名とパスワード)認証を使用してKafkaに接続するには、以下のオプションを設定してください。網掛けされたPlainLoginModuleクラス名を使用してください。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Databricksは、パスワードをコードに直接含めるのではなく、秘密情報として保存することを推奨しています。詳細については、 「シークレット管理」を参照してください。

SASL/SCRAMを使用して認証を行う

SASL/SCRAM(SCRAM-SHA-256またはSCRAM-SHA-512)を使用してKafkaに接続するには、以下のオプションを設定します。網掛けされたScramLoginModuleクラス名を使用してください。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
注記

Kafka クラスターが SCRAM-SHA-256 を使用するように構成されている場合は、 SCRAM-SHA-512 SCRAM-SHA-256に置き換えてください。

Databricksは、パスワードをコードに直接含めるのではなく、秘密情報として保存することを推奨しています。詳細については、 「シークレット管理」を参照してください。

SSLを使用してDatabricksをKafkaに接続する

KafkaへのSSL/TLS接続を有効にするには、 kafka.security.protocol SSLに設定し、 kafka.をプレフィックスとするトラストストアとキーストアの構成オプションを指定します。サーバー認証のみを必要とするSSL接続(一方向TLS)の場合、トラストストアが必要です。Kafkaブローカーがクライアントの認証も行う相互TLS(mTLS)の場合、トラストストアとキーストアの両方が必要です。

以下のSSL/TLSオプションが利用可能です。SSLプロパティの完全なリストについては、 Apache KafkaのSSL設定に関するドキュメント、およびConfluentドキュメントの「SSLによる暗号化と認証」を参照してください。

オプション

説明

kafka.security.protocol

TLS暗号化を有効にするには、 SSLに設定してください。

kafka.ssl.truststore.location

信頼できる認証局証明書を含むトラストストアファイルへのパス。

kafka.ssl.truststore.password

信頼ストアファイルのパスワード。

kafka.ssl.truststore.type

トラストストアのファイル形式(デフォルト: JKS )。

kafka.ssl.keystore.location

クライアント証明書と秘密鍵を含むキーストアファイルへのパス(mTLSに必要)。

kafka.ssl.keystore.password

キーストアファイルのパスワード。

kafka.ssl.key.password

キーストア内の秘密鍵のパスワード。

kafka.ssl.endpoint.identification.algorithm

ホスト名検証アルゴリズム。デフォルト値はhttpsです。無効にするには、空の文字列に設定してください。

SSLを使用する場合、Databricksは以下のことを推奨します。

  • 証明書はUnity Catalogボリュームに保存してください。 ボリュームから読み取り権限を持つユーザーは、あなたのKafka証明書を使用できます。詳細については、 Unity Catalogボリュームとは?」を参照してください。
  • 証明書のパスワードは、シークレットスコープ内のシークレットとして保存してください。詳細については、 「シークレットスコープの管理」を参照してください。

次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

DatabricksのシェーディングされたKafkaクラス名を使用する

Databricksは、独自のシェーディング版Kafkaクライアントライブラリをバンドルしています。認証設定オプションで参照するすべての Kafka クライアントクラス名は、標準のオープンソースクラス名ではなく、網掛けされたクラス名プレフィックスを使用する必要があります。これは、 kafka.sasl.jaas.configkafka.sasl.login.callback.handler.classkafka.sasl.client.callback.handler.classなどのオプションで参照されるすべてのクラスに適用されます。

網掛けされていないクラス名を使用すると、 RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDエラーが発生します。詳細はFAQをご覧ください。

潜在的なエラーの処理

  • 認証時に例外missing gcp_optionsが発生しました

    この例外は、コールバック ハンドラーがブートストラップ URL からスコープを推測できない場合にスローされます。databricks.serviceCredential.scope手動で設定します。ほとんどのユースケースでは、 https://www.googleapis.com/auth/cloud-platformに設定します。

  • 解決可能なブートストラップ URL が見つかりません

    これは、コンピュート クラスターがブートストラップ ホスト名を解決できないことを意味します。 VPC構成をチェックして、コンピュート クラスターが管理対象Kafkaクラスターに到達できることを確認します。 必要に応じて、 VPCピアリングまたはプライベート サービス接続を構成します。

  • 権限の問題

    • サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対するroles/managedkafka.client IAMバインディングがあることを確認してください。
    • Kafka クラスターにトピックとサービス アカウントに対して適切な ACL が定義されていることを確認します。Google クラウドのドキュメントのKafka ACL によるアクセス制御」を参照してください。
  • レコードが返されませんでした

    認証は成功したがデータが返されない場合:

    • 正しいトピック名をサブスクライブしていることを確認します。
    • デフォルトのstartingOffsetslatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、 startingOffsetsearliestに設定します。