メインコンテンツまでスキップ

認証

Databricks Kafka コネクタは、Kafka に接続するための複数の認証方法をサポートしています。この記事では、Databricks で最も一般的な認証方法のいくつかについて説明します。サポートされている認証方法の完全なリストは、 Kafka のドキュメントに記載されています。

IAM を使用した Kafka 向け Amazon マネージドストリーミング

IAM ベースの認証を使用して、 DatabricksからAmazon Managed ストリーミング for Kafka (MSK) に接続できます。 MSK の設定手順については、 「Amazon MSK 設定」を参照してください。

注記

次の構成は、IAM を使用して MSK に接続する場合にのみ必要です。Apache Spark Kafka コネクタによって提供されるオプションを使用して、MSK への接続を構成することもできます。

Unity Catalog認証情報を使用して接続します

Databricks Runtime 16.1のリリース以降、 Databricks AWS Managed Streaming for Apache Kafka (MSK) へのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Databricks 、特に共有クラスターまたはサーバレス コンピュートでKafkaストリーミングを実行する場合に、このアプローチを推奨します。

認証にUnity Catalog資格情報を使用するには、次のステップを実行します。

  • 新しいUnity Catalog認証情報を作成します。 このプロセスに慣れていない場合は、 「サービス資格情報の作成」の手順を参照してください。

    • サービス資格情報にアタッチされているIAMロールに、MSK クラスターに接続するために必要な権限があることを確認してください。
  • Kafka 構成のソース オプションとして、Unity Catalog サービス資格情報の名前を指定します。オプションdatabricks.serviceCredentialサービス資格情報の名前に設定します。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

: Unity Catalogサービス の資格情報を使用してKafkaに接続する場合、次のオプションは不要になります。

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class

インスタンスプロファイルに接続する

インスタンスプロファイルを使用して、IAM 認証が有効になっている Amazon MSK クラスターに対して認証を行うことができます。インスタンスの構成に関する詳細については、インスタンス参照してください。

インスタンスを使用して MSK に接続するには、次のオプションを構成します。

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

IAMユーザー/ロールとの接続

オプションで、インスタンスの代わりにIAMユーザーまたはIAMロールを使用して MSK への接続を構成できます。 これを行うには、環境変数AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYを使用して、AWS アクセスキーとシークレットキーの値を指定する必要があります。「Spark 構成プロパティまたは環境変数でシークレットを使用する」を参照してください。

さらに、 IAMロールを使用して接続を構成する場合は、次の例のように、ロールARNを含めるようにkafka.sasl.jaas.configに指定された値を変更する必要があります。

Python
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

SSLを使用してDatabricksをKafkaに接続する

Kafka への SSL 接続を有効にするには、Confluent ドキュメントの「SSL を使用した暗号化と認証」の手順に従ってください。そこに記述されている構成を、プレフィックスkafka.を付けてオプションとして提供できます。たとえば、信頼ストアの場所はプロパティkafka.ssl.truststore.locationで指定されます。

SSL を使用する場合、Databricks では次のことをお勧めします。

  • 証明書をUnity Catalogボリュームに保存します。 ボリュームからの読み取りアクセス権を持つユーザーは、Kafka 証明書を使用できるようになります。
  • 証明書のパスワードをシークレットスコープ に シークレット として保存します。

次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

潜在的なエラーの処理

  • IAM認証の失敗

    SaslExceptionFailed to construct kafka consumer 、または認証エラーが表示される場合は、以下を確認してください。

    • kafka.sasl.jaas.configのIAM ARN正しく、適切な形式になっています。
    • IAMロールには、MSK クラスターにアクセスするために必要な権限があります (例: kafka-cluster:Connectkafka-cluster:ReadData )。
    • インスタンス計画の場合は、インスタンス計画がクラスターにアタッチされており、MSK 権限があることを確認してください。
    • クロスアカウント アクセスの場合、信頼関係によって Databricks アカウントがロールを引き受けることができることを確認します。
  • ネットワーク接続の問題

    TimeoutExceptionまたは接続エラーが表示される場合:

    • MSK クラスターのセキュリティ グループが、 Kafkaポート上でDatabricksコンピュート セキュリティ グループからの受信トラフィックを許可していることを確認します (通常は PLAINTEXT の場合は 9092、 SASL/ SSLの場合は 9094、またはIAMの場合は 9098)。
    • Databricks VPC と MSK VPC の間で VPC ピアリングまたは PrivateLink が正しく構成されていることを確認します。
    • kafka.bootstrap.serversホスト名とポートが正しいことを確認してください。
  • レコードが返されませんでした

    認証は成功したがデータが返されない場合:

    • 正しいトピック名をサブスクライブしていることを確認します。
    • デフォルトのstartingOffsetslatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、 startingOffsetsearliestに設定します。
    • IAMロールにトピックに対するkafka-cluster:ReadData権限があることを確認してください。