Lakeflow Spark宣言型パイプラインでリアルタイムモードを使用します
パブリックプレビュー
Lakeflow Spark宣言型パイプラインのリアルタイム モードは、Databricks Runtime 18.1.2でパブリック プレビュー段階です。プレビューチャンネルで。
リアルタイムモードは、エンドツーエンドの遅延が5ミリ秒という超低遅延データ処理を可能にします。ストリーミングデータへの即座の応答が必要な、不正検出やリアルタイムのパーソナライゼーションといった運用ワークロードには、リアルタイムモードが最適です。
リアルタイムモードは、パイプラインの外部でも構造化ストリーミングで直接利用できます。構造化ストリーミングのリアルタイムモードを参照してください。
リアルタイムモードでの低レイテンシの実現方法
リアルタイムモードは、標準の連続処理とは3つの主要な点で異なります:
- 長時間実行されるバッチ :システムは、ソースで利用可能になり次第、長時間実行されるバッチ(デフォルトは5分です)内でデータを処理します。
- 同時ステージスケジューリング :すべてのクエリステージは同時にスケジュールされます。コンピュートリソースは、すべてのステージに同時に対応するために、十分な利用可能なタスクスロットがなければなりません。コンピュートのサイジングを参照してください。
- ストリーミング シャッフル :下流ステージを開始する前に上流ステージの完了を待つのではなく、データは生成され次第すぐにステージ間で渡されます。
チェックポイント間隔(pipelines.trigger.interval を介して構成)は、状態とソースオフセットが耐久ストレージに保持される頻度を制御します。間隔を長くすると、チェックポイントのオーバーヘッドは削減されますが、障害発生後の復旧時間は増加し、メトリクスの報告は遅延します。間隔が短いほど耐久性は向上しますが、オーバーヘッドが増加します。
リアルタイムモードと連続パイプライン
リアルタイムモードは、連続トリガーの特殊なタイプです。連続モードは引き続き必要です。リアルタイムモードは、その上でフローレベルのレイテンシ最適化を追加します。リアルタイムモードを使用するには、最初にパイプラインを連続モードで実行する必要があります。リアルタイムモードは、標準的な継続処理が提供する以上のサブ秒のレイテンシーを実現するために、フローレベルで追加の最適化を適用します。
リアルタイムモードを有効にするには、3つの設定ステップが必要です。
- パイプラインを連続モードに設定。
- パイプラインレベルでリアルタイムモードを有効にします。
- リアルタイム更新フローを定義する
要件
要件 | Value |
|---|---|
Databricks Runtime | 18.1.2SDPプレビューチャンネルで |
クラスタータイプ | クラシックコンピュートまたはサーバレス |
リアルタイム モードを構成する
ステップ 1:パイプラインを連続モードに設定する
パイプライン設定で、 パイプラインモード を 連続 に設定するか、パイプラインJSONで設定してください:
{
"continuous": true
}
ステップ 2: パイプラインレベルでリアルタイムモードを有効にする
パイプライン設定で、 詳細設定 > Spark構成 にあるSpark構成に、次のキーを追加します。
spark.databricks.streaming.realTimeMode.enabled = true
パイプラインJSONでも、これを設定できます:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
ステップ 3: リアルタイム更新フローを定義する
リアルタイムモードには、更新フローが必要です。dp.create_sink() を使用して出力ターゲットを定義し、pipelines.trigger が "RealTime" に設定され、target がシンクを指すように @dp.update_flow デコレーターを使用します。
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
フローレベル設定パラメーター
パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
| はい | — |
|
| No |
| チェックポイント間隔。状態とオフセットがコミットされる頻度を制御します。短い値は回復性を向上させます;長い値はオーバーヘッドを削減します。 |
コードの例
Kafka から Kafka へ
Kafkaトピックからデータを読み込み、Kafka出力ターゲットに書き込む:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
ブロードキャスト結合で強化
Kafkaストリームを静的ルックアップテーブルにJOINします。ブロードキャスト (ストリーム静的) 結合のみがサポートされています。ストリーム間結合はリアルタイムモードではサポートされていません。
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
集計
ステートフルな groupBy を使用して、キー別にイベントをカウントします。ステートフル操作の入力パーティション数に合わせてspark.sql.shuffle.partitionsを設定します。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
サポートされているソースとシンク
コネクター | ソースとして | シンクとして | 注 |
|---|---|---|---|
Apache Kafka | ✓ | ✓ | — |
AWS MSK | ✓ | ✓ | Kafka と互換性のあるインターフェイスを利用します。 |
Azure Event Hubs (Kafka コネクタ) | ✓ | ✓ | Kafka と互換性のあるインターフェイスを利用します。 |
Amazon Kinesis | ✓ | サポートされていない | 拡張ファンアウト(EFO)モード専用です。 |
Delta | サポートされていない | サポートされていない | — |
コンピュートのサイジング
コンピュートに十分なタスク スロットがある場合は、コンピュート リソースごとに 1 つのリアルタイム パイプラインを実行できます。利用可能なタスクスロットは、すべてのクエリステージのすべてのタスクをカバーする必要があります。
パイプラインタイプ | 構成 | 必要なタスク スロット |
|---|---|---|
シングルステージ ステートレス(Kafka ソースとシンク) |
| 8 |
2段階ステートフル(Kafka ソース + シャッフル) |
| 28(8 + 20) |
3ステージ (Kafka ソース + 2つのシャッフル) |
| 48(8 + 20 + 20) |
maxPartitions を設定しない場合、Kafka トピックのパーティション数を使用します。
オペレーターサポート
カテゴリー | オペレーター | サポートされています |
|---|---|---|
ステートレス | 選択、投影 | ✓ |
UDF | Scala UDF | ✓ (制限付き) |
UDF | Python UDF | ✓ (制限付き) |
集計 | sum、count、max、min、avg | ✓ |
ウィンドウイング | 転がる、滑る | ✓ |
ウィンドウイング | セッション | サポートされていない |
重複排除 |
| ✓ (無制限の状態) |
重複排除 |
| サポートされていない |
テーブルのJOIN | ブロードキャストテーブル結合 | ✓ |
テーブルのJOIN | ストリーム-ストリームJOIN | サポートされていない |
カスタム |
| ✓ (動作の相違点あり) |
カスタム |
| ✓ (制限付き) |
カスタム |
| サポートされていない |
カスタム |
| サポートされていない |
カスタム |
| サポートされていない |
カスタム |
| サポートされていない |
transformWithState リアルタイムモードで
transformWithState リアルタイムモードに対応しており、マイクロバッチ処理と以下の点が異なります。
handleInputRowsバッチごとにキーごとに1回ではなく、行ごとに1回呼び出されます。inputRowsイテレーターは、呼び出しごとに単一の値を返します。- イベント時刻タイマーは対応していません。処理時間タイマーは、データが到着していない場合、長時間実行されるバッチが終了すると起動します。
transformWithStateInPandasサポートされていません。
リアルタイムモードのPandas UDF
Pandas UDF でのレイテンシーを最小限に抑えるには、spark.sql.execution.arrow.maxRecordsPerBatch を 1 に設定します。これは、スループットを犠牲にしてレイテンシが最適化されます。スループットも重要な場合は、この値を100以上に設定してください。
リアルタイムモードのパフォーマンスモニタリング
リアルタイムモードでは、StreamingQueryProgressのlatenciesフィールドでレイテンシメトリクスが表示されます。これらのメトリクスには、StreamingQueryListener またはストリーミング クエリの lastProgress プロパティを検査することでアクセスできます。
メトリクス | 説明 |
|---|---|
| フローによってレコードが読み取られてから、フローによって完全に処理されるまでの時間 |
| レコードがメッセージバスに正常に書き込まれてから(例:Kafkaでのログ追加時刻)、フローによって最初に読み取られるまでの時間 |
| レコードがソースで生成されてから、フローによって完全に処理されるまでのエンドツーエンドの合計レイテンシーです。 |
各メトリクスは、p50、p90、p95、p99のパーセンタイル値としてレポートされます。
制限事項:
1つのパイプラインにつき、1つのリアルタイムフローが推奨されます。複数のフローは許可されていますが、フロー間でのタスクスロットの競合により、レイテンシーが増加します。
オペレーターおよびソースの制限事項の完全なリストについては、リアルタイムモードの制限事項を参照してください。