メインコンテンツまでスキップ

Apache Kafka と Databricks によるストリーム処理

この記事では、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafka の詳細については、Kafka のドキュメントを参照してください。

Kafka からのデータの読み取り

以下に、Kafkaからのストリーミング読み込みの例を挙げています。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

以下の例の通り、Databricksは、Kafka データソースのバッチ読み込みのセマンティクスもサポートしています。

Python
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

増分バッチ読み込みの場合、Databricks では Kafka を Trigger.AvailableNowと共に使用することをお勧めします。 増分バッチ処理の構成を参照してください。

Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数を提供します。SQLを使用したストリーミングは、 Lakeflow Spark宣言型パイプラインまたはDatabricks SQLのストリーミング テーブルでのみサポートされます。 read_kafkaテーブル値関数を参照してください。

Kafka 構造化ストリーミング リーダーを構成する

Databricks は、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka キーワードを提供しています。

Kafka の最も一般的な構成を以下に示します。

サブスクライブするトピックを指定する方法は複数あります。次のパラメーターの 1 つのみを指定する必要があります。

オプション

説明

subscribe

トピックのコンマ区切りリスト

サブスクライブするトピックのリストです。

subscribePattern

Javaの正規表現文字列

トピックをサブスクライブするのに使われるパターンです。

assign

JSON 文字列 {"topicA":[0,1],"topic":[2,4]}

コンシュームする特定の topicPartitions です。

その他の注目すべき構成:

オプション

デフォルト値

説明

kafka.bootstrap.servers

ホストのコンマ区切りリスト.

値がありません

[必須] Kafka の bootstrap.servers 構成です。Kafka からのデータがない場合は、まずブローカーのアドレスリストを確認してください。ブローカーのアドレスリストが正しくない場合、エラーが発生しない可能性があります。これは、ブローカーが最終的に利用可能になるとKafka クライアントが想定しており、ネットワークエラーが発生した場合には永久に再試行を繰り返すためです。

failOnDataLoss

true または false

true

[オプション]データが失われた可能性がある場合にクエリを失敗させるかどうか。クエリは、削除されたトピック、処理前のトピックの切り捨てなど、多くのシナリオにより、Kafka からのデータの読み取りに永続的に失敗する可能性があります。私たちは、データが失われた可能性があるかどうかを控えめに推定しようとします。これにより、誤警報が発生する場合があります。このオプションを false に設定すると、期待どおりに機能しない場合、またはデータが失われてもクエリの処理を続行できます。

minPartitions

整数 > = 0、0 = 無効

0 (無効)

[オプション] Kafka から読み取るパーティションの最小数。minPartitions オプションを使用して、Kafka から読み取るために任意の最小パーティションを使用するように Spark を設定できます。通常、Spark は Kafka の topicPartitions と Kafka から消費される Spark パーティションを 1 対 1 でマッピングしています。minPartitions オプションを Kafka の topicPartitions よりも大きな値に設定すると、Spark は大きなKafka パーティションを分割して小さくします。このオプションは、負荷のピーク時、データスキュー時、およびストリームが遅れているときに設定して、処理速度を向上させることができます。これには各トリガーで Kafka コンシューマーを初期化するというコストがかかり、Kafka への接続時に SSL を使用する場合はパフォーマンスに影響を与える可能性があります。

kafka.group.id

Kafka コンシューマーグループの ID

設定されていません

[オプション] Kafka からの読み取り中に使用するグループ ID。これは注意して使用してください。デフォルトでは、各クエリはデータを読み取るための一意のグループ ID を生成します。これにより、各クエリには他のコンシューマからの干渉を受けない独自のコンシューマグループが確保され、サブスクライブされたトピックのすべてのパーティションを読み取ることができます。一部のシナリオ(Kafka グループベースの承認など)では、特定の承認されたグループ ID を使用してデータを読み取ることが必要な場合があります。必要に応じて、グループ ID を設定することができます。ただし、予期しない動作が発生する可能性があるため、細心の注意を払って実行してください。 - 同じグループ ID で同時に実行されるクエリ (バッチとストリーミングの両方) は、互いに干渉し、各クエリがデータの一部のみを読み取る可能性があります。 - これは、クエリが立て続けに開始/再開された場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms を非常に小さく設定します。

startingOffsets

earliest、latest

latest

[オプション] クエリが開始されるときの開始点。最も古いオフセットからの「earliest」、または各 TopicPartition の開始オフセットを指定する JSON 文字列のいずれかです。json では、オフセットとして -2 を使用して最も古いものを参照し、-1 を使用して最新のものを参照できます。注:バッチクエリの場合、latest(暗黙的または json で -1 を使用) は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断したところから再開されます。クエリ中に新しく検出されたパーティションは、できるだけ早く開始されます。

他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。

Kafka レコードのスキーマ

Kafka レコードのスキーマは次のとおりです。

タイプ

キー

binary

binary

トピック

string

パーティション

int

オフセット

ロング

タイムスタンプ

ロング

timestampType

int

keyvalue は常に、ByteArrayDeserializer を持つバイト配列として逆シリアル化されます。データフレーム 操作(cast("string")など)を使用して、キーと値を明示的に逆シリアル化します。

Kafka へのデータの書き込み

以下は、Kafka へのストリーミング書き込みの例です:

Python
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。

Python
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Kafka 構造化ストリーミング ライターの構成

重要

Databricks Runtime 13.3 LTS 以降には、デフォルトによるべき等書き込みを可能にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているが、 IDEMPOTENT_WRITE が有効になっていない場合、書き込みは失敗し、 org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateというエラー メッセージが表示されます。

このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミングライターの設定時に .option(“kafka.enable.idempotence”, “false”) を設定します。

DataStreamWriter に提供されたスキーマは、Kafka シンクと対話します。次のフィールドを使用することができます。

列名

必須またはオプション

タイプ

key

オプション

STRING または BINARY

value

必須

STRING または BINARY

headers

オプション

ARRAY

topic

オプション(topic がライターオプションとして設定されている場合は無視されます)

STRING

partition

オプション

INT

以下は、Kafka への書き込み時に設定される一般的なオプションです。

オプション

デフォルト値

説明

kafka.boostrap.servers

カンマで区切られたリスト <host:port>

なし

[必須] Kafka の bootstrap.servers 構成です。

topic

STRING

設定されていません

[オプション] 書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。

includeHeaders

BOOLEAN

false

[オプション] Kafka ヘッダーを行に含めるかどうか。

他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。

Kafka メトリクスの取得

avgOffsetsBehindLatestmaxOffsetsBehindLatest 、および minOffsetsBehindLatest メトリクスを使用して、サブスクライブされたすべてのトピック間で、ストリーミングクエリが利用可能な最新のオフセットよりも遅れているオフセット数の平均、最小、最大を取得できます。「メトリクスをインタラクティブに読み取る」を参照。

注記

Databricks Runtime 9.1 以降で利用可能です。

estimatedTotalBytesBehindLatest の値を調べて、クエリプロセスがサブスクライブされたトピックから消費しなかった推定合計バイト数を取得します。この推定値は、過去 300 秒間に処理されたバッチに基づきます。推定の基準となる時間枠は、オプション bytesEstimateWindowLength を別の値に設定することで変更できます。たとえば、10 分に設定するには、次のようにします。

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

SSL を使用して Databricks を Kafka に接続する

Kafka への SSL 接続を有効にするには、Confluent のドキュメント「 SSL による暗号化と認証」の手順に従います。 ここで説明する構成 (オプションとして、プレフィックス kafka.を付けて提供できます。 たとえば、トラスト ストアの場所をプロパティ kafka.ssl.truststore.locationで指定します。

Databricks では、次のことをお勧めします。

  • 証明書をクラウド・オブジェクト・ストレージに保管します。証明書へのアクセスは、 にアクセスできるクラスタ Kafkaのみに制限できます。 データガバナンス with Databricksを参照してください。
  • 証明書のパスワードをシークレットスコープ に シークレットとして保存します 。

次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Kafka認証

Unity Catalogの資格情報認証

Databricks Runtime 16.1のリリース以降、 Databricks 、 Apache Kafka (MSK) およびAzure Event Hubs のAWSマネージド ストリーミングへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Apache KafkaのGCPマネージド サービスのサポートがDatabricks Runtime 18.0に追加されました。 Databricks 、共有クラスターでKafkaストリーミングを実行する場合、およびサポートされている場合は、サーバーレス コンピュートを使用する場合に、このアプローチを推奨します。

Unity Catalog サービスの資格情報を認証に使用するには、次の手順を実行します。

  • 新しい Unity Catalog サービスの資格情報を作成します。このプロセスに詳しくない場合は、 サービス資格情報の作成 手順を参照してください。
  • Unity Catalog サービスの資格情報の名前を Kafka 構成のソース オプションとして指定します。オプション [ databricks.serviceCredential ] をサービス資格情報の名前に設定します。

Databricks 、 Databricks Runtime 18.0 以降のApache Kafkaの Google クラウド マネージド サービスへのアクセスを認証するためのUnity Catalogサービス 認証情報をサポートしています。 Google クラウド マネージドサービス for Apache KafkaクラスターはVPCで実行されるため、 DatabricksコンピュートはマネージドKafkaクラスターに到達できるVPCで実行する必要があります。 サーバーレス コンピュートはDatabricksで管理されるコントロール プレーンで実行されるため、サポートされていません。 接続を有効にするには、 VPCピアリングまたはプライベート サービス 接続を構成します。 Google クラウドのドキュメントのVPCネットワーク ピアリング」または「プライベート サービス接続」を参照してください。

認証のために、 Unity Catalogサービス資格情報用に作成された Google クラウド サービス アカウントに、マネージドKafkaクラスターを含む Google クラウド プロジェクトでの次のロールを付与します。

  • roles/managedkafka.client

手順については、Google クラウドのドキュメントの「アクセスの付与、変更、取り消し」を参照してください。

次の例では、サービス資格情報を使用して Kafka をソースとして構成します。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

Kafka に Unity Catalog サービス資格情報を提供する場合は、次のオプションを指定しないでください。

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

オプションdatabricks.serviceCredentialdatabricks.serviceCredential.scopeは Spark ソース オプションです。オプションkafka.databricks.serviceCredentialkafka.databricks.serviceCredential.scopeは、ログイン コールバック ハンドラーによって使用される Kafka クライアント オプションであるため、先頭にkafka.を付ける必要があります。

SASL/OAUTHBEARER オプションを直接指定することで、 databricks.serviceCredentialソース オプションを使用せずに、 Apache Kafkaの Google クラウド マネージドサービスに対する認証を行うことができます。 この方法は、Kafka SASL オプションを明示的に設定する必要がある場合にのみ使用してください (たとえば、特定のコールバック ハンドラーを使用する場合など)。

このアプローチには、Unity Catalog サービスの資格情報が必要です。

  • サービス認証情報の Google クラウド サービス アカウントには、マネージドKafkaクラスターを含む Google クラウド プロジェクト内にroles/managedkafka.client含まれている必要があります。
  • Databricks 提供のログイン コールバック ハンドラーorg.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandlerを使用します (Databricks Runtime に含まれていないcom.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlerではありません)。

Unity Catalog 外での Google サービス アカウントによる認証はサポートされていません。

次の例では、SASL オプションを直接指定して、Kafka をソースとして設定します。

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

これらの SASL オプションを指定するときはdatabricks.serviceCredentialを設定しないでください。databricks.serviceCredentialが設定されている場合、Databricks は Kafka 認証を自動的に構成し、 kafka.sasl.*オプションの指定を禁止します。

潜在的なエラーの処理

  • 認証時に例外missing gcp_optionsが発生しました

    この例外は、コールバック ハンドラーがブートストラップ URL からスコープを推測できない場合にスローされます。databricks.serviceCredential.scope手動で設定します。ほとんどのユースケースでは、 https://www.googleapis.com/auth/cloud-platformに設定します。

  • 解決可能なブートストラップ URL が見つかりません

    これは、コンピュート クラスターがブートストラップ ホスト名を解決できないことを意味します。 VPC構成をチェックして、コンピュート クラスターが管理対象Kafkaクラスターに到達できることを確認します。

  • 権限の問題

    • サービス アカウントに、 Kafkaクラスターが属するプロジェクトに対するroles/managedkafka.client IAMバインディングがあることを確認してください。
    • Kafka クラスターにトピックとサービス アカウントに対して適切な ACL が定義されていることを確認します。Google クラウドのドキュメントのKafka ACL によるアクセス制御」を参照してください。