Apache Kafka と Databricks によるストリーム処理
この記事では、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。
Kafka の詳細については、Kafka のドキュメントを参照してください。
Kafka からのデータの読み取り
以下に、Kafkaからのストリーミング読み込みの例を挙げています。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
以下の例の通り、Databricksは、Kafka データソースのバッチ読み込みのセマンティクスもサポートしています。
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
増分バッチ読み込みの場合、Databricks では Kafka を Trigger.AvailableNowと共に使用することをお勧めします。 増分バッチ処理の構成を参照してください。
Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数を提供します。SQLを使用したストリーミングは、 Lakeflow Spark宣言型パイプラインまたはDatabricks SQLのストリーミング テーブルでのみサポートされます。 read_kafkaテーブル値関数を参照してください。
Kafka 構造化ストリーミング リーダーを構成する
Databricks は、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka キーワードを提供しています。
Kafka の最も一般的な構成を以下に示します。
サブスクライブするトピックを指定する方法は複数あります。次のパラメーターの 1 つのみを指定する必要があります。
オプション | 値 | 説明 |
|---|---|---|
subscribe | トピックのコンマ区切りリスト | サブスクライブするトピックのリストです。 |
subscribePattern | Javaの正規表現文字列 | トピックをサブスクライブするのに使われるパターンです。 |
assign | JSON 文字列 | コンシュームする特定の topicPartitions です。 |
その他の注目すべき構成:
オプション | 値 | デフォルト値 | 説明 |
|---|---|---|---|
kafka.bootstrap.servers | ホストのコンマ区切りリスト. | 値がありません | [必須] Kafka の |
failOnDataLoss |
|
| [オプション]データが失われた可能性がある場合にクエリを失敗させるかどうか。クエリは、削除されたトピック、処理前のトピックの切り捨てなど、多くのシナリオにより、Kafka からのデータの読み取りに永続的に失敗する可能性があります。私たちは、データが失われた可能性があるかどうかを控えめに推定しようとします。これにより、誤警報が発生する場合があります。このオプションを |
minPartitions | 整数 > = 0、0 = 無効 | 0 (無効) | [オプション] Kafka から読み取るパーティションの最小数。 |
kafka.group.id | Kafka コンシューマーグループの ID | 設定されていません | [オプション] Kafka からの読み取り中に使用するグループ ID。これは注意して使用してください。デフォルトでは、各クエリはデータを読み取るための一意のグループ ID を生成します。これにより、各クエリには他のコンシューマからの干渉を受けない独自のコンシューマグループが確保され、サブスクライブされたトピックのすべてのパーティションを読み取ることができます。一部のシナリオ(Kafka グループベースの承認など)では、特定の承認されたグループ ID を使用してデータを読み取ることが必要な場合があります。必要に応じて、グループ ID を設定することができます。ただし、予期しない動作が発生する可能性があるため、細心の注意を払って実行してください。 - 同じグループ ID で同時に実行されるクエリ (バッチとストリーミングの両方) は、互いに干渉し、各クエリがデータの一部のみを読み取る可能性があります。 - これは、クエリが立て続けに開始/再開された場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 |
startingOffsets | earliest、latest | latest | [オプション] クエリが開始されるときの開始点。最も古いオフセットからの「earliest」、または各 TopicPartition の開始オフセットを指定する JSON 文字列のいずれかです。json では、オフセットとして -2 を使用して最も古いものを参照し、-1 を使用して最新のものを参照できます。注:バッチクエリの場合、latest(暗黙的または json で -1 を使用) は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断したところから再開されます。クエリ中に新しく検出されたパーティションは、できるだけ早く開始されます。 |
他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。
Kafka レコードのスキーマ
Kafka レコードのスキーマは次のとおりです。
列 | タイプ |
|---|---|
キー | binary |
値 | binary |
トピック | string |
パーティション | int |
オフセット | ロング |
タイムスタンプ | ロング |
timestampType | int |
key と value は常に、ByteArrayDeserializer を持つバイト配列として逆シリアル化されます。データフレーム 操作(cast("string")など)を使用して、キーと値を明示的に逆シリアル化します。
Kafka へのデータの書き込み
以下は、Kafka へのストリーミング書き込みの例です:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Kafka 構造化ストリーミング ライターの構成
Databricks Runtime 13.3 LTS 以降には、デフォルトによるべき等書き込みを可能にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているが、 IDEMPOTENT_WRITE が有効になっていない場合、書き込みは失敗し、 org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateというエラー メッセージが表示されます。
このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミングライターの設定時に .option(“kafka.enable.idempotence”, “false”) を設定します。
DataStreamWriter に提供されたスキーマは、Kafka シンクと対話します。次のフィールドを使用することができます。
列名 | 必須またはオプション | タイプ |
|---|---|---|
| オプション |
|
| 必須 |
|
| オプション |
|
| オプション( |
|
| オプション |
|
以下は、Kafka への書き込み時に設定される一般的なオプションです。
オプション | 値 | デフォルト値 | 説明 |
|---|---|---|---|
| カンマで区切られたリスト | なし | [必須] Kafka の |
|
| 設定されていません | [オプション] 書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。 |
|
|
| [オプション] Kafka ヘッダーを行に含めるかどうか。 |
他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。
Kafka メトリクスの取得
avgOffsetsBehindLatest 、 maxOffsetsBehindLatest 、および minOffsetsBehindLatest メトリクスを使用して、サブスクライブされたすべてのトピック間で、ストリーミングクエリが利用可能な最新のオフセットよりも遅れているオフセット数の平均、最小、最大を取得できます。「メトリクスをインタラクティブに読み取る」を参照。
Databricks Runtime 9.1 以降で利用可能です。
estimatedTotalBytesBehindLatest の値を調べて、クエリプロセスがサブスクライブされたトピックから消費しなかった推定合計バイト数を取得します。この推定値は、過去 300 秒間に処理されたバッチに基づきます。推定の基準となる時間枠は、オプション bytesEstimateWindowLength を別の値に設定することで変更できます。たとえば、10 分に設定するには、次のようにします。
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
SSL を使用して Databricks を Kafka に接続する
Kafka への SSL 接続を有効にするには、Confluent のドキュメント「 SSL による暗号化と認証」の手順に従います。 ここで説明する構成 (オプションとして、プレフィックス kafka.を付けて提供できます。 たとえば、トラスト ストアの場所をプロパティ kafka.ssl.truststore.locationで指定します。
Databricks では、次のことをお勧めします。
- 証明書をクラウド・オブジェクト・ストレージに保管します。証明書へのアクセスは、 にアクセスできるクラスタ Kafkaのみに制限できます。 データガバナンス with Databricksを参照してください。
- 証明書のパスワードをシークレットスコープ に シークレットとして保存します 。
次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Kafka認証
Unity Catalogの資格情報認証
Databricks Runtime 16.1のリリース以降、 Databricks 、 Apache Kafka (MSK) およびAzure Event Hubs のAWSマネージド ストリーミングへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Apache KafkaのGCPマネージド サービスのサポートがDatabricks Runtime 18.0に追加されました。 Databricks 、共有クラスターでKafkaストリーミングを実行する場合、およびサポートされている場合は、サーバーレス コンピュートを使用する場合に、このアプローチを推奨します。
Unity Catalog サービスの資格情報を認証に使用するには、次の手順を実行します。
- 新しい Unity Catalog サービスの資格情報を作成します。このプロセスに詳しくない場合は、 サービス資格情報の作成 手順を参照してください。
- Unity Catalog サービスの資格情報の名前を Kafka 構成のソース オプションとして指定します。オプション [
databricks.serviceCredential] をサービス資格情報の名前に設定します。
Databricks 、 Databricks Runtime 18.0 以降のApache Kafkaの Google クラウド マネージド サービスへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Google クラウド マネージドサービス for Apache KafkaクラスターはVPCで実行されるため、 DatabricksコンピュートはマネージドKafkaクラスターに到達できるVPCで実行する必要があります。 サーバーレス コンピュートはDatabricksで管理されるコントロール プレーンで実行されるため、サポートされていません。 接続を有効にするには、 VPCピアリングまたはプライベート サービス 接続を構成します。 Google クラウドのドキュメントのVPCネットワーク ピアリング」または「プライベート サービス接続」を参照してください。
認証のために、 Unity Catalogサービス資格情報用に作成された Google クラウド サービス アカウントに、マネージドKafkaクラスターを含む Google クラウド プロジェクトでの次のロールを付与します。
roles/managedkafka.client
手順については、Google クラウドのドキュメントの「アクセスの付与、変更、取り消し」を参照してください。
次の例では、サービス資格情報を使用して Kafka をソースとして構成します。
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
Kafka に Unity Catalog サービス資格情報を提供する場合は、次のオプションを指定しないでください。
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.login.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
オプションdatabricks.serviceCredentialとdatabricks.serviceCredential.scopeは Spark ソース オプションです。オプションkafka.databricks.serviceCredentialとkafka.databricks.serviceCredential.scopeは、ログイン コールバック ハンドラーによって使用される Kafka クライアント オプションであるため、先頭にkafka.を付ける必要があります。
SASL/OAUTHBEARER オプションを直接指定することで、 databricks.serviceCredentialソース オプションを使用せずに、 Apache Kafkaの Google クラウド マネージドサービスに対する認証を行うことができます。 この方法は、Kafka SASL オプションを明示的に設定する必要がある場合にのみ使用してください (たとえば、特定のコールバック ハンドラーを使用する場合など)。
このアプローチには、Unity Catalog サービスの資格情報が必要です。
- サービス認証情報の Google クラウド サービス アカウントには、マネージドKafkaクラスターを含む Google クラウド プロジェクト内に
roles/managedkafka.client含まれている必要があります。 - Databricks 提供のログイン コールバック ハンドラー
org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandlerを使用します (Databricks Runtime に含まれていないcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlerではありません)。
Unity Catalog 外での Google サービス アカウントによる認証はサポートされていません。
次の例では、SASL オプションを直接指定して、Kafka をソースとして設定します。
- Python
- Scala
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}
df = spark.read.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"startingOffsets" -> "earliest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.jaas.config" ->
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class" ->
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
// Required by the login callback handler:
"kafka.databricks.serviceCredential" -> "<service-credential-name>",
// Optional:
// "kafka.databricks.serviceCredential.scope" -> "https://www.googleapis.com/auth/cloud-platform",
)
val df = spark.read.format("kafka").options(kafkaOptions).load()
これらの SASL オプションを指定するときはdatabricks.serviceCredentialを設定しないでください。databricks.serviceCredentialが設定されている場合、Databricks は Kafka 認証を自動的に構成し、 kafka.sasl.*オプションの指定を禁止します。
潜在的なエラーの処理
-
認証時に例外
missing gcp_optionsが発生しましたこの例外は、コールバック ハンドラーがブートストラップ URL からスコープを推測できない場合にスローされます。
databricks.serviceCredential.scope手動で設定します。ほとんどのユースケースでは、https://www.googleapis.com/auth/cloud-platformに設定します。 -
解決可能なブートストラップ URL が見つかりません
これは、コンピュート クラスターがブートストラップ ホスト名を解決できないことを意味します。 VPC構成をチェックして、コンピュート クラスターが管理対象Kafkaクラスターに到達できることを確認します。
-
権限の問題
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する
roles/managedkafka.clientIAMバインディングがあることを確認してください。 - Kafka クラスターにトピックとサービス アカウントに対して適切な ACL が定義されていることを確認します。Google クラウドのドキュメントのKafka ACL によるアクセス制御」を参照してください。
- サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対する