構造化ストリーミングのリアルタイムモード
プレビュー
この機能は パブリック プレビュー段階です。
リアルタイム モードは、エンドツーエンドのレイテンシが 5 ミリ秒という超低レイテンシのデータ処理を可能にする、構造化ストリーミングのトリガー タイプです。不正行為の検出、リアルタイムのパーソナライゼーション、即時の意思決定システムなど、ストリーミング データへの即時応答が必要な運用ワークロードには、リアルタイム モードを使用します。
リアルタイム モードは、Databricks Runtime 16.4 LTS 以降で利用できます。詳細なセットアップ手順については、 「リアルタイム モードの開始」を参照してください。コード例については、 「リアルタイム モードの例」を参照してください。
リアルタイムモードとは何ですか?
運用ワークロードと分析ワークロード
ストリーミング ワークロードは、分析ワークロードと運用ワークロードに大きく分けることができます。
- 分析ワークロードでは、通常はメダリオンアーキテクチャに従ってデータ取り込みと変換を使用します (たとえば、ブロンズ、シルバー、ゴールドのテーブルにデータを取り込むなど)。
- 運用ワークロードは、リアルタイムデータを消費し、ビジネスロジックを適用し、ダウンストリームのアクションや決定をトリガーします。
運用ワークロードの例を次に示します。
- 不正行為のスコアがしきい値を超えた場合に、異常な場所、大規模な取引サイズ、急速な支出パターンなどの要因に基づいて、クレジット カード取引をリアルタイムでブロックまたはフラグを立てます。
- クリックストリームデータでユーザーがジーンズを5分間閲覧していた場合にプロモーションメッセージを配信し、その後15分以内に購入した場合は25%の割引を提供します。
一般的に、運用ワークロードは、エンドツーエンドのレイテンシが 1 秒未満であることが求められるのが特徴です。これはApache Spark構造化ストリーミングの中断モードで実現できます。
リアルタイムモードで低遅延を実現する方法
リアルタイムモードは、次の方法で実行アーキテクチャを改善します。
- 長時間実行バッチを実行します (デフォルトは 5 分)。この場合、システムはソースでデータが利用可能になるとすぐにデータを処理します。
- クエリのすべてのステージを同時にスケジュールします。これには、使用可能なタスク スロットの数が、バッチ内のすべてのステージのタスクの数以上である必要があります。
- ストリーミング シャッフルを使用して、データが生成されるとすぐにステージ間でデータを渡します。
バッチの処理が終了し、次のバッチが開始される前に、構造化ストリーミング チェックポイントが進行し、メトリクスが公開されます。 バッチ期間はチェックポイントの頻度に影響します。
- より長いバッチ : チェックポイントの頻度が低くなります。つまり、失敗時のリプレイが長くなり、メトリクスの可用性が遅れます。
- バッチが短い : チェックポイントの頻度が高くなり、レイテンシに影響する可能性があります。
Databricks では、適切なトリガー間隔を見つけるために、ターゲット ワークロードに対してリアルタイム モードをベンチマークすることをお勧めします。
リアルタイムモードを使用する場合
ユースケースで必要な場合は、リアルタイム モードを選択します。
- 1 秒未満の遅延 : リアルタイムでトランザクションをブロックする必要がある不正検出システムなど、数ミリ秒以内にデータに応答する必要があるアプリケーション。
- 運用上の意思決定 : リアルタイムのオファー、アラート、通知など、受信データに基づいて即時のアクションをトリガーするシステム。
- 継続的な処理 : 定期的なバッチ処理ではなく、データが到着するとすぐに処理される必要があるワークロード。
次の場合は、マイクロバッチ モード (デフォルトの構造化ストリーミング トリガー) を使用します。
- 分析処理 : ETLパイプライン、データ変換、およびレイテンシ要件が秒または分単位で測定されるメダリオン アーキテクチャの実装。
- コストの最適化 : 中断モードでは専用のコンピュート リソースが必要なため、1 秒未満のレイテンシが必要ないワークロード。
- チェックポイントの頻度は重要です 。より頻繁なチェックポイントにより回復が速くなるアプリケーションです。
要件と構成
起動モードには、コンピュートのセットアップとクエリの構成に特定の要件があります。 このセクションでは、沸騰モードを使用するために必要な前提条件と設定ステップについて説明します。
前提条件
リアルタイム モードを使用するには、次の要件を満たす必要があります。
- Databricks Runtime 16.4 LTS 以上 : リアルタイム モードは、DBR 16.4 LTS 以降のバージョンでのみ使用できます。
- 専用コンピュート : 専用 (以前はシングル ユーザー) コンピュートを使用する必要があります。 Standard (旧共有)、 LakeFlow Spark宣言型パイプライン、およびサーバレス クラスターはサポートされていません。
- オートスケールなし : オートスケールを無効にする必要があります。
- Photon なし : リアルタイム モードでは Photon アクセラレーションはサポートされません。
- Spark 構成 :
spark.databricks.streaming.realTimeMode.enabledtrueに設定する必要があります。
コンピュート構成
次の設定を使用してコンピュートを構成します。
- Spark 構成で
spark.databricks.streaming.realTimeMode.enabledをtrueに設定します。 - オートスケールを無効にします。
- Photon加速を無効にします。
- コンピュートが専用クラスター (標準、 LakeFlow Spark宣言型パイプライン、またはサーバーレスではない) として構成されていることを確認します。
貯蓄モード用のコンピュートの作成と構成の詳細な手順については、 「貯蓄モードの開始」を参照してください。
クエリ構成
クエリをリアルタイム モードで実行するには、リアルタイム トリガーを有効にする必要があります。リアルタイム トリガーは更新モードでのみサポートされます。
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
コンピュートのサイジング
コンピュートに十分なタスク スロットがある場合は、コンピュート リソースごとに 1 つの保留ジョブを実行できます。
低遅延モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージのタスク数以上である必要があります。
スロット計算の例
パイプラインタイプ | 構成 | 必要なスロット |
|---|---|---|
シングルステージステートレス(Kafka ソース + シンク) |
| 8スロット |
2 段階ステートフル (Kafka ソース + シャッフル) |
| 28スロット(8 + 20) |
3 段階 ( Kafkaソース + シャッフル + 再分割) |
| 48スロット(8 + 20 + 20) |
maxPartitionsを設定しない場合は、Kafka トピック内のパーティション数を使用します。
主な考慮事項
コンピュートを設定するときは、次の点を考慮してください。
-
マイクロバッチモードとは異なり、リアルタイムタスクはデータを待っている間、アイドル状態のままになる可能性があるため、リソースの浪費を避けるためには、適切なサイジングが不可欠です。
-
以下の調整を行って、目標の使用率レベル (たとえば 50%) を目指します。
maxPartitions( Kafka対象)spark.sql.shuffle.partitions(シャッフルステージ用)
-
Databricks では、オーバーヘッドを削減するために各タスクが複数の Kafka パーティションを処理するように
maxPartitionsを設定することを推奨しています。 -
ワーカーごとにタスクスロットを調整して、単純な 1 ステージジョブのワークロードに一致させます。
-
シャッフルの多いジョブの場合は、バックログを回避するシャッフル パーティションの最小数を見つけて、そこから調整する経験をしてください。 十分なスロットがない場合、コンピュートはジョブをスケジュールしません。
Databricks Runtime 16.4 LTS 以降では、すべてのリアルタイム パイプラインでチェックポイント v2 が使用され、リアルタイム モードとマイクロバッチ モードをシームレスに切り替えることができます。
最適化技術
テクニック | デフォルトで有効 |
|---|---|
非同期進行状況追跡: オフセット ログとコミット ログへの書き込みを非同期スレッドに移動し、2 つのマイクロ バッチ間のバッチ間時間を短縮します。 これにより、ステートレス ストリーミング クエリのレイテンシを削減できます。 | No |
非同期状態チェックポイント: 状態チェックポイントを待たずに、前のマイクロバッチの計算が完了するとすぐに次のマイクロバッチの処理を開始することで、ステートフル ストリーミング クエリの待機時間を短縮します。 | No |
モニタリングと可観測性
クエリ パフォーマンスの測定は、リアルタイムのワークロードにとって不可欠です。起動モードでは、従来のバッチ期間メトリクスは実際のレイテンシーを反映しないため、代替アプローチが必要になります。
エンドツーエンドのレイテンシはワークロードに固有であり、ビジネス ロジックでのみ正確に測定できる場合があります。たとえば、ソースタイムスタンプが Kafka で出力される場合、Kafka の出力タイムスタンプとソースタイムスタンプの差としてレイテンシを計算できます。
以下で説明する組み込みメトリクスとAPIsを使用して、エンドツーエンドの遅延を推定することもできます。
メトリクスと StreamingQueryProgress を組み込む
次のメトリクスは StreamingQueryProgress イベントに含まれており、ドライバー ログに自動的に記録されます。 また、 StreamingQueryListenerの onQueryProgress() コールバック関数からアクセスすることもできます。QueryProgressEvent.json() または toString() には、追加のリアルタイム モード メトリクスが含まれています。
- 処理待ち時間 (processingLatencyMs) 。リアルタイム モード クエリがレコードを読み取ってから、クエリがそれを次のステージまたはダウンストリームに書き込むまでの経過時間。シングルステージクエリの場合、これは E2E レイテンシと同じ期間を測定します。システムはタスクごとにこのメトリクスを報告します。
- ソース キューイングの遅延 (sourceQueuingLatencyMs) 。システムがメッセージ バスにレコードを書き込むとき (たとえば、Kafka のログ追加時間) と、リアルタイム モード クエリが最初にレコードを読み取るときとの間の経過時間。システムはタスクごとにこのメトリクスを報告します。
- E2E レイテンシ (e2eLatencyMs) 。システムがレコードをメッセージ バスに書き込んでから、リアルタイム モード クエリがレコードをダウンストリームに書き込むまでの時間。システムは、すべてのタスクによって処理されたすべてのレコードにわたって、このメトリクスをバッチごとに集計します。
例えば:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Observe API を使用したカスタムレイテンシ測定
Observe API を使用すると、別のジョブを起動せずにレイテンシを測定できます。ソース データの到着時間を近似するソース タイムスタンプがある場合は、Observe API を使用して各バッチのレイテンシを見積もることができます。シンクに到達する前にタイムスタンプを渡します。
- Python
- Scala
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
この例では、エントリを出力する前に現在のタイムスタンプが記録され、このタイムスタンプとレコードのソースタイムスタンプの差を計算することでレイテンシーが推定されます。結果は進行状況レポートに含まれ、リスナーが利用できるようになります。出力例を次に示します。
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
機能のサポートと制限
このセクションでは、互換性のある環境、言語、ソース、シンク、演算子、特定の機能に関する特別な考慮事項など、リアルタイム モードでサポートされている機能と現在の制限について説明します。
サポートされている環境、言語、モード
クラスタータイプ | サポート |
|---|---|
専用(旧:シングルユーザー) | Yes |
標準(旧:共有) | No |
Lakeflow Spark宣言型パイプライン Classic | No |
Lakeflow Spark宣言型パイプライン サーバレス | No |
サーバーレス | No |
サポートされている言語:
言語 | サポート |
|---|---|
Scala | Yes |
Java | Yes |
Python | Yes |
サポートされている実行モード:
実行 モード | サポート |
|---|---|
更新モード | Yes |
追加モード | No |
コンプリートモード | No |
サポートされているソースとシンク
ソース:
ソース | サポート |
|---|---|
Apache Kafka | Yes |
AWS MSK | Yes |
Event Hubs (Kafka コネクタを使用) | Yes |
Kinesis | はい(EFOモードのみ) |
Google Pub/Sub | No |
Apache Pulsar | No |
シンク:
シンク | サポート |
|---|---|
Apache Kafka | Yes |
Event Hubs (Kafka コネクタを使用) | Yes |
Kinesis | No |
Google Pub/Sub | No |
Apache Pulsar | No |
任意のシンク (forEachWriter を使用) | Yes |
サポートされている演算子
演算子 | サポート |
|---|---|
ステートレス操作 | |
選択 | Yes |
投影 | Yes |
UDF | |
Scala UDF | はい (一部制限があります) |
Python UDF | はい (一部制限があります) |
集計 | |
合計 | Yes |
count | Yes |
max | Yes |
min | Yes |
avg | Yes |
Yes | |
ウィンドウ | |
タンブリング | Yes |
スライディング | Yes |
セッション | No |
重複排除 | |
重複をドロップする | はい (状態は無制限です) |
ウォーターマーク内の重複を削除 | No |
ストリーム - テーブル結合 | |
放送テーブル(小さいもの) | Yes |
ストリーム - ストリーム結合 | No |
(フラット)MapGroupsWithState (英語) | No |
transformWithState(トランスフォーム・ウィズ・ステート) | はい(一部異なります) |
union | はい (一部制限があります) |
For each | Yes |
forEachBatch | No |
マップパーティション | いいえ(制限事項を参照) |
特別な考慮事項
一部の演算子と機能には、リアルタイム モードで使用する場合の特定の考慮事項や違いがあります。
リアルタイムモードでのtransformWithState
カスタムステートフルアプリケーションを構築するために、Databricks は Apache Spark 構造化ストリーミングのAPIである transformWithStateをサポートしています。APIとコード スニペットの詳細については、「カスタム ステートフル アプリケーションの構築」を参照してください。
ただし、API がリアルタイム モードで動作する方法と、マイクロバッチ アーキテクチャを活用する従来のストリーミング クエリとの間には、いくつかの違いがあります。
-
リアルタイム モードでは、各行に対して
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)メソッドが呼び出されます。inputRowsイテレータは単一の値を返します。マイクロバッチ モードでは、キーごとにこれを 1 回呼び出し、inputRowsイテレータはマイクロバッチ内のキーのすべての値を返します。- コードを書くときは、この違いに注意する必要があります。
-
イベント時間タイマーは、リアルタイム モードではサポートされていません。
-
リアルタイム モードでは、データの到着に応じてタイマーの起動が遅延されます。
- タイマーが 10:00:00 にスケジュールされているが、データが到着しない場合、タイマーはすぐには起動しません。
- データが 10:00:10 に到着した場合、タイマーは 10 秒遅延して起動します。
- データが到着せず、長時間実行バッチが終了する場合、バッチが終了する前にタイマーが起動します。
リアルタイムモードでの Python UDF
Databricks では、ほとんどの Python ユーザー定義関数 (UDF) がリアルタイム モードでサポートされています。
UDFタイプ | サポート |
|---|---|
ステートレス UDF | |
Python スカラー UDF (リンク) | Yes |
矢印スカラーUDF | Yes |
Pandas スカラー UDF (リンク) | Yes |
矢印関数 ( | Yes |
Pandas関数(リンク) | Yes |
ステートフル グルーピング UDF (UDAF) | |
transformWithState ( | Yes |
パンダに状態を適用 | No |
非ステートフル グルーピング UDF (UDAF) | |
適用する | No |
矢印に適用 | No |
パンダに適用 | No |
テーブル関数 | |
UDTF(リンク) | No |
カリフォルニア大学UDF | No |
Python UDF をリアルタイム モードで使用する場合、考慮すべき点がいくつかあります。
-
遅延を最小限に抑えるには、Arrow バッチ サイズ (
spark.sql.execution.arrow.maxRecordsPerBatch) を 1 に設定します。- トレードオフ: この構成では、スループットを犠牲にして待機時間が最適化されます。ほとんどのワークロードでは、この設定をお勧めします。
- バッチサイズは、入力量に対応するためにより高いスループットが必要な場合にのみ増やし、レイテンシーの増加の可能性を受け入れます。
-
Pandas の UDF と関数は、Arrow のバッチサイズが 1 の場合、うまく機能しません。
- Pandas UDF または関数を使用する場合は、矢印の バッチ サイズを大きい値 (たとえば、100 以上) に設定します。
- これは、レイテンシーが長くなることを意味します。Databricks では、可能であれば Arrow UDF または関数を使用することをお勧めします。
-
Pandasのパフォーマンスの問題により、transformWithState は
Rowインターフェイスでのみサポートされます。
制限
ソースの制限
Kinesis の場合、リアルタイム モードはポーリング モードをサポートしていません。さらに、頻繁な再パーティションはレイテンシに悪影響を及ぼす可能性があります。
連合の制限
Union 演算子にはいくつかの制限があります。
-
リアルタイム モードでは自己結合はサポートされません。
- Kafka : 同じソース データ フレーム オブジェクトとそこから派生したデータ フレームを使用することはできません。回避策: 同じソースから読み取る異なるDataFramesを使用します。
- Kinesis : 同じ構成で同じ Kinesis ソースから派生したデータ フレームを結合することはできません。回避策: 異なるDataFramesを使用するだけでなく、各DataFrameに異なる 'consumerName' オプションを割り当てることもできます。
-
リアルタイム モードでは、Union の前に定義されたステートフル演算子 (
aggregate、deduplicate、transformWithState) はサポートされません。 -
リアルタイム モードでは、バッチ ソースとの結合はサポートされません。
MapPartitionsの制限
mapPartitions Scalaや同様のPython APIs ( mapInPandas 、 mapInArrow ) では、入力パーティション全体の反復子を受け取り、入力と出力間の任意のマッピングを使用して出力全体の反復子を生成します。 これらのAPIs出力全体をブロックし、レイテンシを増加させることでストリーミングModeでパフォーマンスの問題を引き起こす可能性があります。 これらのAPIsのセマンティクスは、ウォーターマークの伝播を十分にサポートしていません。
同様の機能を実現するには、代わりにスカラー UDF を変換複合データ型と組み合わせて使用するか、 filter使用してください。
次のステップ
リアルタイム モードの概要とその構成方法がわかったので、次のリソースを参照して、リアルタイム ストリーミング アプリケーションの実装を開始してください。
- オンライン モードを開始する - 段階的な手順に従ってコンピュートを設定し、最初のストリーミング クエリを実行します。
- リアルタイム モードのコード例 - Kafka ソースとシンク、ステートフル クエリ、集計、カスタム シンクなどの実際の例を調べます。
- 構造化ストリーミングの概念 - Databricks 上の構造化ストリーミングの基本的な概念を学習します。