リアルタイムモードのクエリパフォーマンスを最適化および監視する
このページでは、コンピュートのチューニング、エンドツーエンドの遅延を削減するためのテクニック、および突然モードでクエリのパフォーマンスを測定するためのアプローチについて説明します。
コンピュートチューニング
コンピュートを設定するときは、次の点を考慮してください。
-
マイクロバッチモードとは異なり、リアルタイムタスクはデータ待ちの間アイドル状態になる可能性があるため、リソースの無駄遣いを避けるためには適切なサイジングが不可欠です。
-
次のように調整して、目標クラスター使用率レベル (50% など) を目指します。
maxPartitions(Kafka向け)spark.sql.shuffle.partitions(シャッフルステージ用)
-
Databricksは、オーバーヘッドを削減するために、各タスクが複数のKafkaパーティションを処理するように
maxPartitionsを設定することを推奨しています。 -
単純な 1 段階のジョブのワークロードに合わせて、ワーカーごとのタスク スロットを調整します。
-
シャッフルの多いジョブの場合は、バックログを回避するシャッフル パーティションの最小数を見つけて、そこから調整する経験をしてください。 十分なスロットがない場合、コンピュートはジョブをスケジュールしません。
Databricks Runtime 16.4 LTS以降では、すべてのリアルタイムパイプラインがチェックポイントv2を使用し、リアルタイムモードとマイクロバッチモード間のシームレスな切り替えを可能にしています。
レイテンシー最適化
構造化ストリーミングのリアルタイムモードには、エンドツーエンドの遅延を低減するためのオプションの技術が用意されています。どちらもデフォルトでは有効になっていません。それぞれ個別に有効化する必要があります。
- 非同期進捗状況追跡:オフセットへの書き込みとコミットログを非同期スレッドに移動することで、ステートレスクエリのバッチ間時間を短縮します。
- 非同期状態チェックポイント:状態チェックポイントを待たずに、計算が完了するとすぐに次のマイクロバッチの処理を開始し、ステートフルクエリのレイテンシを削減します。
モニタリングと可観測性
起動モードでは、従来のバッチ期間のメトリクスは実際のエンドツーエンドのレイテンシを反映しません。 以下の方法を用いて、レイテンシを正確に測定し、クエリにおけるボトルネックを特定してください。
エンドツーエンドのレイテンシはワークロードによって異なり、ビジネスロジックを考慮に入れなければ正確に測定できない場合もあります。例えば、ソースタイムスタンプがKafkaに出力される場合、Kafkaの出力タイムスタンプとソースタイムスタンプの差としてレイテンシを計算できます。
組み込みのメトリクス StreamingQueryProgress
StreamingQueryProgressイベントはドライバログに自動的に記録され、 StreamingQueryListenerのonQueryProgress()コールバック関数を通じてアクセスできます。これにより、たとえばメトリクスを外部モニタリング システムに公開する場合など、進行状況イベントにプログラム的に対応できるようになります。 QueryProgressEvent.json()またはtoString()には、以下のリアルタイムモードのメトリクスが含まれます。
- 処理遅延 (
processingLatencyMs)。リアルタイムモードのクエリがレコードを読み取ってから、クエリがそれを次のステージまたは下流に書き込むまでの経過時間。単一ステージのクエリの場合、これはエンドツーエンドのレイテンシと同じ期間を測定します。システムはタスクごとにこのメトリクスを報告します。 - ソースキューイング遅延 (
sourceQueuingLatencyMs)。システムがメッセージバスにレコードを書き込んだ時点(例えば、Kafkaのログ追加時間)から、リアルタイムモードのクエリが最初にそのレコードを読み取るまでの経過時間。システムはタスクごとにこのメトリクスを報告します。 - エンドツーエンドの遅延 (
e2eLatencyMs)。システムがレコードをメッセージバスに書き込んでから、リアルタイムモードのクエリがレコードを下流に書き込むまでの時間。システムは、すべてのタスクによって処理されたすべてのレコードにわたって、このメトリクスをバッチごとに集計します。
例えば:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Observe API を使用したカスタムレイテンシ測定
Observe APIを使用すると、別のジョブを起動することなく、レイテンシをインラインで測定できます。ソースデータの到着時刻を近似するソースタイムスタンプがある場合は、シンクの前のタイムスタンプを記録し、その差を計算することで、バッチごとの遅延を推定できます。結果は進捗報告書に掲載され、リスナーが閲覧できる。
- Python
- Scala
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
サンプル出力:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}