認証
Databricks Kafka コネクタは、Kafka に接続するための複数の認証方法をサポートしています。この記事では、Databricks で最も一般的な認証方法のいくつかについて説明します。サポートされている認証方法の完全なリストは、 Kafka のドキュメントに記載されています。
Apache Kafka用 Google クラウド マネージド サービス
Unity Catalog認証情報を使用して接続します
Databricks 、 Databricks Runtime 18.0 以降のApache Kafkaの Google クラウド マネージド サービスへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Google クラウド マネージドサービス for Apache KafkaクラスターはVPCで実行されるため、 DatabricksコンピュートはマネージドKafkaクラスターに到達できるVPCで実行する必要があります。 サーバーレス コンピュートはDatabricksで管理されるコントロール プレーンで実行されるため、サポートされていません。 接続を有効にするには、 VPCピアリングまたはプライベート サービス 接続を構成します。 Google クラウドのドキュメントのVPCネットワーク ピアリング」または「プライベート サービス接続」を参照してください。
認証のために、 Unity Catalogサービス資格情報用に作成された Google クラウド サービス アカウントに、マネージドKafkaクラスターを含む Google クラウド プロジェクトでの次のロールを付与します。
roles/managedkafka.client
手順については、Google クラウドのドキュメントの「アクセスの付与、変更、取り消し」を参照してください。
次の例では、サービス資格情報を使用して Kafka をソースとして構成します。
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
注 : Unity Catalog サービス資格情報を Kafka に提供する際には、次のオプションを指定しないでください。
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.login.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
SASLオプションで接続する
SASL/OAUTHBEARER オプションを直接指定することで、 databricks.serviceCredentialソース オプションを使用せずに、 Apache Kafkaの Google クラウド マネージドサービスに対する認証を行うことができます。 この方法は、Kafka SASL オプションを明示的に設定する必要がある場合にのみ使用してください (たとえば、特定のコールバック ハンドラーを使用する場合など)。
このアプローチには、Unity Catalog サービスの資格情報が必要です。
- サービス認証情報の Google クラウド サービス アカウントには、マネージドKafkaクラスターを含む Google クラウド プロジェクト内に
roles/managedkafka.client含まれている必要があります。 - Databricks 提供のログイン コールバック ハンドラー
org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandlerを使用します (Databricks Runtime に含まれていないcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlerではありません)。
Unity Catalog 外での Google サービス アカウントによる認証はサポートされていません。
次の例では、SASL オプションを直接指定して、Kafka をソースとして設定します。
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"startingOffsets" -> "earliest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.jaas.config" ->
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class" ->
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
// Required by the login callback handler:
"kafka.databricks.serviceCredential" -> "<service-credential-name>",
// Optional:
// "kafka.databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
注意 : これらの SASL オプションを指定するときはdatabricks.serviceCredentialを設定しないでください。databricks.serviceCredentialが設定されている場合、Databricks は Kafka 認証を自動的に構成し、 kafka.sasl.*オプションの指定を禁止します。
SSLを使用してDatabricksをKafkaに接続する
Kafka への SSL 接続を有効にするには、Confluent ドキュメントの「SSL を使用した暗号化と認証」の手順に従ってください。そこに記述されている構成を、プレフィックスkafka.を付けてオプションとして提供できます。たとえば、信頼ストアの場所はプロパティkafka.ssl.truststore.locationで指定されます。
SSL を使用する場合、Databricks では次のことをお勧めします。
- 証明書をUnity Catalogボリュームに保存します。 ボリュームからの読み取りアクセス権を持つユーザーは、Kafka 証明書を使用できるようになります。
- 証明書のパスワードをシークレットスコープ に シークレット として保存します。
次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
潜在的なエラーの処理
-
認証時に例外
missing gcp_optionsが発生しましたこの例外は、コールバック ハンドラーがブートストラップ URL からスコープを推測できない場合にスローされます。
databricks.serviceCredential.scope手動で設定します。ほとんどのユースケースでは、https://www.googleapis.com/auth/cloud-platformに設定します。 -
解決可能なブートストラップ URL が見つかりません
これは、コンピュート クラスターがブートストラップ ホスト名を解決できないことを意味します。 VPC構成をチェックして、コンピュート クラスターが管理対象Kafkaクラスターに到達できることを確認します。 必要に応じて、 VPCピアリングまたはプライベート サービス接続を構成します。
-
権限の問題
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
roles/managedkafka.clientIAMバインディングがあることを確認してください。 - Kafka クラスターにトピックとサービス アカウントに対して適切な ACL が定義されていることを確認します。Google クラウドのドキュメントのKafka ACL によるアクセス制御」を参照してください。
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
-
レコードが返されませんでした
認証は成功したがデータが返されない場合:
- 正しいトピック名をサブスクライブしていることを確認します。
- デフォルトの
startingOffsetsはlatestで、新しいデータのみを読み取ります。既存のデータを読み取るには、startingOffsetsをearliestに設定します。