認証
このページでは、Databricks 上の Kafka コネクタ向けの最も一般的な認証方法を紹介します。
サポートされている認証方法の完全なリストについては、Kafka のドキュメントを参照してください。
IAMを使用してAmazon MSKに接続する
IAM ベースの認証を使用して、 DatabricksからAmazon Managed ストリーミング for Kafka (MSK) に接続できます。 MSK の設定手順については、 「Amazon MSK 設定」を参照してください。
IAMを使用してMSKに接続する場合、以下の接続方法のいずれかを使用する必要があります。別の方法として、Apache Spark Kafkaコネクタが提供するオプションを使用して、MSKへの接続を構成できます。
Unity Catalog認証情報を使用して接続します
Databricks Runtime 16.1 以降では、Databricks は AWS Managed ストリーミング for Apache Kafka (MSK) へのアクセスを認証するための Unity Catalog サービス資格情報をサポートしています。Databricks は、共有クラスターを使用する場合や、サーバレス コンピュートを使用する場合に、この認証方法を推奨します。
認証にUnity Catalog サービス資格情報を使用するには、次の操作を行います。
-
新しい Unity Catalog サービス資格情報を作成します。サービス資格情報の作成を参照してください。
- サービス資格情報にアタッチされているIAMロールが、MSKクラスターに接続するための適切な権限を持っていることを確認してください。
-
ソースオプション
databricks.serviceCredentialを Unity Catalog サービス資格情報の名前として設定します。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Unity Catalog サービス資格情報を使用して Kafka に接続する場合は、次のオプションを使用しないでください。
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.class
インスタンスプロファイルに接続する
IAM 認証が有効になっている Amazon MSK クラスターに、インスタンスプロファイルを使用して認証できます。See インスタンスプロファイル.
インスタンスを使用して MSK に接続するには、次のオプションを構成します。
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
IAMユーザーおよびIAMロールと連携する
オプションで、インスタンスプロファイルの代わりに IAM ユーザーまたは IAM ロールを使用して、MSK への接続を構成できます。AWS アクセスキーとシークレットキーの値は、環境変数 AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEYを使用して指定する必要があります。Spark 構成プロパティまたは環境変数でのシークレットの使用を参照してください。
IAMロールを使用して接続を構成するには、以下の例のように、kafka.sasl.jaas.configの値を変更してロールARNを含める必要があります。
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
認証にはSASL/PLAINを使用します。
SASL/PLAIN(ユーザー名とパスワード)認証を使用してKafkaに接続するには、以下のオプションを設定してください。網掛けされたPlainLoginModuleクラス名を使用してください。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Databricks では、パスワードをコードに直接含めるのではなく、シークレットとして格納することをお勧めしています。詳細については、シークレット管理を参照してください。
SASL/SCRAMを使用して認証を行う
SASL/SCRAM(SCRAM-SHA-256またはSCRAM-SHA-512)を使用してKafkaに接続するには、以下のオプションを設定します。網掛けされたScramLoginModuleクラス名を使用してください。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Kafka クラスターが SCRAM-SHA-256 を使用するように構成されている場合は、 SCRAM-SHA-512 SCRAM-SHA-256に置き換えてください。
Databricks では、パスワードをコードに直接含めるのではなく、シークレットとして格納することをお勧めしています。詳細については、シークレット管理を参照してください。
SSLを使用してDatabricksをKafkaに接続する
Kafka への SSL/TLS 接続を有効にするには、kafka.security.protocol を SSL に設定し、kafka. を接頭辞として付加したトラストストアおよびキーストアの構成オプションを指定します。サーバー認証のみ(一方向 TLS)が必要なSSL接続で、トラストストアを使用する必要があります。Kafka ブローカーがクライアントも認証する相互 TLS (mTLS) の場合、トラストストアとキーストアの両方を使用する必要があります。
以下のSSL/TLSオプションが利用可能です。SSLプロパティの完全なリストについては、 Apache KafkaのSSL設定に関するドキュメント、およびConfluentドキュメントの「SSLによる暗号化と認証」を参照してください。
オプション | 説明 |
|---|---|
| TLS暗号化を有効にするには、 |
| 信頼できる認証局証明書を含むトラストストアファイルへのパス。 |
| 信頼ストアファイルのパスワード。 |
| トラストストアのファイル形式(デフォルト: |
| クライアント証明書と秘密鍵を含むキーストアファイルへのパス(mTLSに必要)。 |
| キーストアファイルのパスワード。 |
| キーストア内の秘密鍵のパスワード。 |
| ホスト名検証アルゴリズム。デフォルト値は |
SSLを使用する場合、Databricksは以下のことを推奨します。
- 証明書はUnity Catalogボリュームに保存してください。 ボリュームから読み取り権限を持つユーザーは、あなたのKafka証明書を使用できます。詳細については、 Unity Catalogボリュームとは?」を参照してください。
- 証明書のパスワードは、シークレットスコープ内のシークレットとして保存してください。詳細については、 「シークレットスコープの管理」を参照してください。
次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
DatabricksのシェーディングされたKafkaクラス名を使用する
Databricksは、独自のシェーディング版Kafkaクライアントライブラリをバンドルしています。認証設定オプションで参照するすべての Kafka クライアントクラス名は、標準のオープンソースクラス名ではなく、網掛けされたクラス名プレフィックスを使用する必要があります。これは、 kafka.sasl.jaas.config 、 kafka.sasl.login.callback.handler.class 、 kafka.sasl.client.callback.handler.classなどのオプションで参照されるすべてのクラスに適用されます。
非シェードのクラス名を使用すると、コードでRESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDエラーが発生します。詳しくは、FAQをご参照ください。
潜在的なエラーの処理
-
IAM認証の失敗
SaslException、Failed to construct kafka consumer、または認証エラーが表示される場合は、以下を確認してください。kafka.sasl.jaas.configのIAM ARN正しく、適切な形式になっています。- IAMロールには、MSK クラスターにアクセスするために必要な権限があります (例:
kafka-cluster:Connect、kafka-cluster:ReadData)。 - インスタンス計画の場合は、インスタンス計画がクラスターにアタッチされており、MSK 権限があることを確認してください。
- クロスアカウント アクセスの場合、信頼関係によって Databricks アカウントがロールを引き受けることができることを確認します。
-
ネットワーク接続の問題
TimeoutExceptionまたは接続エラーが表示される場合:- MSK クラスターのセキュリティ グループが、 Kafkaポート上でDatabricksコンピュート セキュリティ グループからの受信トラフィックを許可していることを確認します (通常は PLAINTEXT の場合は 9092、 SASL/ SSLの場合は 9094、またはIAMの場合は 9098)。
- Databricks VPC と MSK VPC の間で VPC ピアリングまたは PrivateLink が正しく構成されていることを確認します。
kafka.bootstrap.serversホスト名とポートが正しいことを確認してください。
-
レコードが返されませんでした
認証は成功したがデータが返されない場合:
- 正しいトピック名をサブスクライブしていることを確認します。
- デフォルトの
startingOffsetsはlatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、startingOffsetsをearliestに設定します。 - IAMロールにトピックに対する
kafka-cluster:ReadData権限があることを確認してください。