メインコンテンツまでスキップ

認証

Databricks Kafka コネクタは、Kafka に接続するための複数の認証方法をサポートしています。この記事では、Databricks で最も一般的な認証方法のいくつかについて説明します。サポートされている認証方法の完全なリストは、 Kafka のドキュメントに記載されています。

IAMを使用してAmazon MSKに接続する

IAM ベースの認証を使用して、 DatabricksからAmazon Managed ストリーミング for Kafka (MSK) に接続できます。 MSK の設定手順については、 「Amazon MSK 設定」を参照してください。

注記

次の構成は、IAM を使用して MSK に接続する場合にのみ必要です。Apache Spark Kafka コネクタによって提供されるオプションを使用して、MSK への接続を構成することもできます。

Unity Catalog認証情報を使用して接続します

Databricks Runtime 16.1のリリース以降、 Databricks AWS Managed Streaming for Apache Kafka (MSK) へのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Databricks 、特に共有クラスターまたはサーバレス コンピュートでKafkaストリーミングを実行する場合に、このアプローチを推奨します。

認証にUnity Catalog資格情報を使用するには、次のステップを実行します。

  • 新しいUnity Catalog認証情報を作成します。 このプロセスに慣れていない場合は、 「サービス資格情報の作成」の手順を参照してください。

    • サービス資格情報にアタッチされているIAMロールに、MSK クラスターに接続するために必要な権限があることを確認してください。
  • Kafka 構成のソース オプションとして、Unity Catalog サービス資格情報の名前を指定します。オプションdatabricks.serviceCredentialサービス資格情報の名前に設定します。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

: Unity Catalogサービス の資格情報を使用してKafkaに接続する場合、次のオプションは不要になります。

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class

インスタンスプロファイルに接続する

インスタンスを使用して、 IAM認証が有効になっているAmazon MSK クラスターに対して認証できます。 インスタンスの構成の詳細については、インスタンスを参照してください。

インスタンスを使用して MSK に接続するには、次のオプションを構成します。

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

IAMユーザー/ロールとの接続

オプションで、インスタンスの代わりにIAMユーザーまたはIAMロールを使用して MSK への接続を構成できます。 これを行うには、環境変数AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYを使用して、AWS アクセスキーとシークレットキーの値を指定する必要があります。「Spark 構成プロパティまたは環境変数でシークレットを使用する」を参照してください。

さらに、 IAMロールを使用して接続を構成する場合は、次の例のように、ロールARNを含めるようにkafka.sasl.jaas.configに指定された値を変更する必要があります。

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

認証にはSASL/PLAINを使用します。

SASL/PLAIN(ユーザー名とパスワード)認証を使用してKafkaに接続するには、以下のオプションを設定してください。網掛けされたPlainLoginModuleクラス名を使用してください。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Databricksは、パスワードをコードに直接含めるのではなく、秘密情報として保存することを推奨しています。詳細については、 「シークレット管理」を参照してください。

SASL/SCRAMを使用して認証を行う

SASL/SCRAM(SCRAM-SHA-256またはSCRAM-SHA-512)を使用してKafkaに接続するには、以下のオプションを設定します。網掛けされたScramLoginModuleクラス名を使用してください。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
注記

Kafka クラスターが SCRAM-SHA-256 を使用するように構成されている場合は、 SCRAM-SHA-512 SCRAM-SHA-256に置き換えてください。

Databricksは、パスワードをコードに直接含めるのではなく、秘密情報として保存することを推奨しています。詳細については、 「シークレット管理」を参照してください。

SSLを使用してDatabricksをKafkaに接続する

KafkaへのSSL/TLS接続を有効にするには、 kafka.security.protocol SSLに設定し、 kafka.をプレフィックスとするトラストストアとキーストアの構成オプションを指定します。サーバー認証のみを必要とするSSL接続(一方向TLS)の場合、トラストストアが必要です。Kafkaブローカーがクライアントの認証も行う相互TLS(mTLS)の場合、トラストストアとキーストアの両方が必要です。

以下のSSL/TLSオプションが利用可能です。SSLプロパティの完全なリストについては、 Apache KafkaのSSL設定に関するドキュメント、およびConfluentドキュメントの「SSLによる暗号化と認証」を参照してください。

オプション

説明

kafka.security.protocol

TLS暗号化を有効にするには、 SSLに設定してください。

kafka.ssl.truststore.location

信頼できる認証局証明書を含むトラストストアファイルへのパス。

kafka.ssl.truststore.password

信頼ストアファイルのパスワード。

kafka.ssl.truststore.type

トラストストアのファイル形式(デフォルト: JKS )。

kafka.ssl.keystore.location

クライアント証明書と秘密鍵を含むキーストアファイルへのパス(mTLSに必要)。

kafka.ssl.keystore.password

キーストアファイルのパスワード。

kafka.ssl.key.password

キーストア内の秘密鍵のパスワード。

kafka.ssl.endpoint.identification.algorithm

ホスト名検証アルゴリズム。デフォルト値はhttpsです。無効にするには、空の文字列に設定してください。

SSLを使用する場合、Databricksは以下のことを推奨します。

  • 証明書はUnity Catalogボリュームに保存してください。 ボリュームから読み取り権限を持つユーザーは、あなたのKafka証明書を使用できます。詳細については、 Unity Catalogボリュームとは?」を参照してください。
  • 証明書のパスワードは、シークレットスコープ内のシークレットとして保存してください。詳細については、 「シークレットスコープの管理」を参照してください。

次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

DatabricksのシェーディングされたKafkaクラス名を使用する

Databricksは、独自のシェーディング版Kafkaクライアントライブラリをバンドルしています。認証設定オプションで参照するすべての Kafka クライアントクラス名は、標準のオープンソースクラス名ではなく、網掛けされたクラス名プレフィックスを使用する必要があります。これは、 kafka.sasl.jaas.configkafka.sasl.login.callback.handler.classkafka.sasl.client.callback.handler.classなどのオプションで参照されるすべてのクラスに適用されます。

網掛けされていないクラス名を使用すると、 RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDエラーが発生します。詳細はFAQをご覧ください。

潜在的なエラーの処理

  • IAM認証の失敗

    SaslExceptionFailed to construct kafka consumer 、または認証エラーが表示される場合は、以下を確認してください。

    • kafka.sasl.jaas.configのIAM ARN正しく、適切な形式になっています。
    • IAMロールには、MSK クラスターにアクセスするために必要な権限があります (例: kafka-cluster:Connectkafka-cluster:ReadData )。
    • インスタンス計画の場合は、インスタンス計画がクラスターにアタッチされており、MSK 権限があることを確認してください。
    • クロスアカウント アクセスの場合、信頼関係によって Databricks アカウントがロールを引き受けることができることを確認します。
  • ネットワーク接続の問題

    TimeoutExceptionまたは接続エラーが表示される場合:

    • MSK クラスターのセキュリティ グループが、 Kafkaポート上でDatabricksコンピュート セキュリティ グループからの受信トラフィックを許可していることを確認します (通常は PLAINTEXT の場合は 9092、 SASL/ SSLの場合は 9094、またはIAMの場合は 9098)。
    • Databricks VPC と MSK VPC の間で VPC ピアリングまたは PrivateLink が正しく構成されていることを確認します。
    • kafka.bootstrap.serversホスト名とポートが正しいことを確認してください。
  • レコードが返されませんでした

    認証は成功したがデータが返されない場合:

    • 正しいトピック名をサブスクライブしていることを確認します。
    • デフォルトのstartingOffsetslatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、 startingOffsetsearliestに設定します。
    • IAMロールにトピックに対するkafka-cluster:ReadData権限があることを確認してください。