メインコンテンツまでスキップ

Lakeflow Spark宣言型パイプラインでリアルタイムモードを使用します

備考

パブリックプレビュー

Lakeflow Spark宣言型パイプラインのリアルタイム モードは、Databricks Runtime 18.1.2でパブリック プレビュー段階です。プレビューチャンネルで。

リアルタイムモードは、エンドツーエンドの遅延が5ミリ秒という超低遅延データ処理を可能にします。ストリーミングデータへの即座の応答が必要な、不正検出やリアルタイムのパーソナライゼーションといった運用ワークロードには、リアルタイムモードが最適です。

リアルタイムモードは、パイプラインの外部でも構造化ストリーミングで直接利用できます。構造化ストリーミングのリアルタイムモードを参照してください。

リアルタイムモードでの低レイテンシの実現方法

リアルタイムモードは、標準の連続処理とは3つの主要な点で異なります:

  • 長時間実行されるバッチ :システムは、ソースで利用可能になり次第、長時間実行されるバッチ(デフォルトは5分です)内でデータを処理します。
  • 同時ステージスケジューリング :すべてのクエリステージは同時にスケジュールされます。コンピュートリソースは、すべてのステージに同時に対応するために、十分な利用可能なタスクスロットがなければなりません。コンピュートのサイジングを参照してください。
  • ストリーミング シャッフル :下流ステージを開始する前に上流ステージの完了を待つのではなく、データは生成され次第すぐにステージ間で渡されます。

チェックポイント間隔(pipelines.trigger.interval を介して構成)は、状態とソースオフセットが耐久ストレージに保持される頻度を制御します。間隔を長くすると、チェックポイントのオーバーヘッドは削減されますが、障害発生後の復旧時間は増加し、メトリクスの報告は遅延します。間隔が短いほど耐久性は向上しますが、オーバーヘッドが増加します。

リアルタイムモードと連続パイプライン

リアルタイムモードは、連続トリガーの特殊なタイプです。連続モードは引き続き必要です。リアルタイムモードは、その上でフローレベルのレイテンシ最適化を追加します。リアルタイムモードを使用するには、最初にパイプラインを連続モードで実行する必要があります。リアルタイムモードは、標準的な継続処理が提供する以上のサブ秒のレイテンシーを実現するために、フローレベルで追加の最適化を適用します。

リアルタイムモードを有効にするには、3つの設定ステップが必要です。

  1. パイプラインを連続モードに設定。
  2. パイプラインレベルでリアルタイムモードを有効にします。
  3. リアルタイム更新フローを定義する

要件

要件

Value

Databricks Runtime

18.1.2SDPプレビューチャンネルで

クラスタータイプ

クラシックコンピュートまたはサーバレス

リアルタイム モードを構成する

ステップ 1:パイプラインを連続モードに設定する

パイプライン設定で、 パイプラインモード連続 に設定するか、パイプラインJSONで設定してください:

JSON
{
"continuous": true
}

ステップ 2: パイプラインレベルでリアルタイムモードを有効にする

パイプライン設定で、 詳細設定 > Spark構成 にあるSpark構成に、次のキーを追加します。

ini
spark.databricks.streaming.realTimeMode.enabled = true

パイプラインJSONでも、これを設定できます:

JSON
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}

ステップ 3: リアルタイム更新フローを定義する

リアルタイムモードには、更新フローが必要です。dp.create_sink() を使用して出力ターゲットを定義し、pipelines.trigger"RealTime" に設定され、target がシンクを指すように @dp.update_flow デコレーターを使用します。

Python
from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;, # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)

フローレベル設定パラメーター

パラメーター

必須

デフォルト

説明

pipelines.trigger

はい

"RealTime" に設定すると、このフローのリアルタイムモードが有効になります。

pipelines.trigger.interval

No

"5 minutes"

チェックポイント間隔。状態とオフセットがコミットされる頻度を制御します。短い値は回復性を向上させます;長い値はオーバーヘッドを削減します。

コードの例

Kafka から Kafka へ

Kafkaトピックからデータを読み込み、Kafka出力ターゲットに書き込む:

Python
from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)

ブロードキャスト結合で強化

Kafkaストリームを静的ルックアップテーブルにJOINします。ブロードキャスト (ストリーム静的) 結合のみがサポートされています。ストリーム間結合はリアルタイムモードではサポートされていません。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})

@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)

集計

ステートフルな groupBy を使用して、キー別にイベントをカウントします。ステートフル操作の入力パーティション数に合わせてspark.sql.shuffle.partitionsを設定します。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
&quot;spark.sql.shuffle.partitions&quot;: &quot;8&quot;,
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)

サポートされているソースとシンク

コネクター

ソースとして

シンクとして

Apache Kafka

AWS MSK

Kafka と互換性のあるインターフェイスを利用します。

Azure Event Hubs (Kafka コネクタ)

Kafka と互換性のあるインターフェイスを利用します。

Amazon Kinesis

サポートされていない

拡張ファンアウト(EFO)モード専用です。

Delta

サポートされていない

サポートされていない

コンピュートのサイジング

コンピュートに十分なタスク スロットがある場合は、コンピュート リソースごとに 1 つのリアルタイム パイプラインを実行できます。利用可能なタスクスロットは、すべてのクエリステージのすべてのタスクをカバーする必要があります。

パイプラインタイプ

構成

必要なタスク スロット

シングルステージ ステートレス(Kafka ソースとシンク)

maxPartitions =8

8

2段階ステートフル(Kafka ソース + シャッフル)

maxPartitions 8、シャッフルパーティション数 = 20

28(8 + 20)

3ステージ (Kafka ソース + 2つのシャッフル)

maxPartitions = 8、2つのシャッフルステージ、それぞれ20

48(8 + 20 + 20)

maxPartitions を設定しない場合、Kafka トピックのパーティション数を使用します。

オペレーターサポート

カテゴリー

オペレーター

サポートされています

ステートレス

選択、投影

UDF

Scala UDF

✓ (制限付き)

UDF

Python UDF

✓ (制限付き)

集計

sum、count、max、min、avg

ウィンドウイング

転がる、滑る

ウィンドウイング

セッション

サポートされていない

重複排除

dropDuplicates

✓ (無制限の状態)

重複排除

dropDuplicatesWithinWatermark

サポートされていない

テーブルのJOIN

ブロードキャストテーブル結合

テーブルのJOIN

ストリーム-ストリームJOIN

サポートされていない

カスタム

transformWithState

✓ (動作の相違点あり)

カスタム

union

✓ (制限付き)

カスタム

forEach

サポートされていない

カスタム

flatMapGroupsWithState

サポートされていない

カスタム

mapPartitions

サポートされていない

カスタム

forEachBatch

サポートされていない

transformWithState リアルタイムモードで

transformWithState リアルタイムモードに対応しており、マイクロバッチ処理と以下の点が異なります。

  • handleInputRows バッチごとにキーごとに1回ではなく、行ごとに1回呼び出されます。inputRows イテレーターは、呼び出しごとに単一の値を返します。
  • イベント時刻タイマーは対応していません。処理時間タイマーは、データが到着していない場合、長時間実行されるバッチが終了すると起動します。
  • transformWithStateInPandas サポートされていません。

リアルタイムモードのPandas UDF

Pandas UDF でのレイテンシーを最小限に抑えるには、spark.sql.execution.arrow.maxRecordsPerBatch1 に設定します。これは、スループットを犠牲にしてレイテンシが最適化されます。スループットも重要な場合は、この値を100以上に設定してください。

リアルタイムモードのパフォーマンスモニタリング

リアルタイムモードでは、StreamingQueryProgresslatenciesフィールドでレイテンシメトリクスが表示されます。これらのメトリクスには、StreamingQueryListener またはストリーミング クエリの lastProgress プロパティを検査することでアクセスできます。

メトリクス

説明

processingLatencyMs

フローによってレコードが読み取られてから、フローによって完全に処理されるまでの時間

sourceQueuingLatencyMs

レコードがメッセージバスに正常に書き込まれてから(例:Kafkaでのログ追加時刻)、フローによって最初に読み取られるまでの時間

e2eLatencyMs

レコードがソースで生成されてから、フローによって完全に処理されるまでのエンドツーエンドの合計レイテンシーです。

各メトリクスは、p50、p90、p95、p99のパーセンタイル値としてレポートされます。

制限事項:

1つのパイプラインにつき、1つのリアルタイムフローが推奨されます。複数のフローは許可されていますが、フロー間でのタスクスロットの競合により、レイテンシーが増加します。

オペレーターおよびソースの制限事項の完全なリストについては、リアルタイムモードの制限事項を参照してください。

その他のリソース