Databricksジョブ、LakeFlow 宣言型パイプライン、 の での可観測性LakeFlow Connect
ストリーミング アプリケーションのパフォーマンス、コスト、および正常性を監視することは、信頼性が高く効率的な ETL パイプラインを構築するために不可欠です。 Databricks は、ボトルネックの診断、パフォーマンスの最適化、リソースの使用量とコストの管理に役立つ、ジョブ、 LakeFlow 宣言型パイプライン、 LakeFlow Connect 全体で豊富な監視機能を提供します。
この記事では、次の領域のベスト プラクティスについて説明します。
- 主要なストリーミングパフォーマンスメトリクス
- イベントログスキーマとクエリ例
- ストリーミング クエリーの モニタリング
- システムテーブルを使用したコストオブザーバビリティ
- ログとメトリクスの外部ツールへのエクスポート
ストリーミングの観測可能性における主要なメトリクス
ストリーミング パイプラインを運用する場合は、次の主要なメトリクスを監視してください。
メトリクス | 目的 |
---|---|
背圧 | ファイル数とオフセット数(サイズ)を監視します。ボトルネックを特定し、システムが遅れることなく受信データを処理できるようにします。 |
スループット | マイクロバッチごとに処理されたメッセージの数を追跡します。パイプラインの効率を評価し、データ取り込みと歩調を合わせていることを確認します。 |
期間 | マイクロバッチの平均期間を測定します。処理速度を示し、バッチ間隔の調整に役立ちます。 |
レイテンシー | 時間の経過と共に処理されるレコード/メッセージの数を示します。エンドツーエンドのパイプライン遅延を理解し、レイテンシを短縮するために最適化するのに役立ちます。 |
クラスターの使用率 | CPU とメモリの使用率 (%) を反映します。効率的なリソースの使用を確保し、処理要求に合わせてクラスターをスケーリングします。 |
ネットワーク | 転送および受信したデータを測定します。ネットワークのボトルネックを特定し、データ転送パフォーマンスを向上させるのに役立ちます。 |
チェックポイント | 処理されたデータとオフセットを識別します。一貫性を確保し、障害時のフォールトトレランスを可能にします。 |
コスト | ストリーミング アプリケーションの時間単位、日単位、月単位のコストが表示されます。予算編成とリソースの最適化を支援します。 |
依存関係 | ストリーミング アプリケーションで作成されたデータセットとレイヤーを表示します。データ変換、追跡、品質保証、デバッグを容易にします。 |
クラスターログとメトリクス
Databricks クラスターログとメトリクスは、クラスターのパフォーマンスと使用率に関する詳細な知見を提供します。 これらのログとメトリクスには、CPU、メモリ、ディスク I/O、ネットワークトラフィック、およびその他のシステムメトリクスに関する情報が含まれます。これらのメトリクスを監視することは、クラスターのパフォーマンスを最適化し、リソースを効率的に管理し、問題のトラブルシューティングを行うために重要です。
Databricks クラスターログとメトリクスは、クラスターのパフォーマンスとリソース使用率に関する詳細な知見を提供します。 これには、CPU とメモリの使用量、ディスク I/O、およびネットワーク トラフィックが含まれます。これらのメトリクスの監視は、次の場合に重要です。
- クラスターのパフォーマンスの最適化。
- リソースを効率的に管理する。
- 運用上の問題のトラブルシューティング。
メトリクスは、 Databricks UIを通じて活用したり、パーソナルモニタリングツールにエクスポートしたりできます。 ノートブックの例: Datadog メトリクスを参照してください。
Spark UI
Spark UI には、完了したタスク、保留中のタスク、失敗したタスクの数など、ジョブとステージの進行状況に関する詳細情報が表示されます。これにより、実行フローを理解し、ボトルネックを特定できます。
ストリーミング アプリケーションの場合、 ストリーミング タブ には、入力レート、処理レート、バッチ期間などのメトリクスが表示されます。 ストリーミング ジョブのパフォーマンスを監視し、データ取り込みや処理の問題を特定するのに役立ちます。
詳細については、「Apache Spark UI を使用したデバッグ」を参照してください。
コンピュート メトリクス
コンピュート メトリクスは、クラスターの利用状況を理解するのに役立ちます。 ジョブが実行されると、ジョブがどのようにスケーリングされ、リソースがどのように影響を受けるかを確認できます。OOM の失敗につながる可能性のあるメモリ負荷や、長い遅延を引き起こす可能性のある CPU 負荷を見つけることができます。ここでは、具体的なメトリクスをご紹介します。
- サーバー負荷分散 :過去 1 分間の各ノードの CPU 使用率。
- CPU 使用率 : CPU がさまざまなモード (ユーザー、システム、アイドル、iowait など) で費やした時間の割合。
- メモリ使用率 : 各モードごとの合計メモリ使用量 (使用済み、空き、バッファ、キャッシュなど)。
- メモリスワップ使用率 : メモリスワップの合計使用量。
- ファイルシステムの空き容量 : 各マウントポイントによるファイルシステムの合計使用量。
- ネットワークスループット :各デバイスがネットワークを介して送受信したバイト数。
- アクティブノードの数 : 特定のコンピュートの各タイムスタンプにおけるアクティブノードの数。
詳細については、パフォーマンスの監視および ハードウェア メトリクス チャートを参照してください。
システムテーブル
コスト監視
Databricks システムテーブルは、ジョブのコストとパフォーマンスを監視するための構造化されたアプローチを提供します。これらのテーブルには、次のものが含まれます。
- ジョブ実行の詳細。
- リソース使用率。
- 関連コスト。
これらの表を使用して、運用の正常性と財務への影響を理解します。
必要条件
コストモニタリングにシステムテーブルを使用するには:
- アカウント管理者は、
system.lakeflow schema
を有効にする必要があります。 - ユーザーは次のいずれかを行う必要があります。
- メタストア管理者とアカウント管理者の両方である、または
- システム スキーマに対する
USE
権限とSELECT
権限を持っている。
クエリの例: 最もコストの高いジョブ (過去 30 日間)
このクエリは、過去 30 日間で最もコストの高いジョブを特定し、コスト分析と最適化を支援します。
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
LakeFlow 宣言型パイプライン
LakeFlow 宣言型パイプライン イベント ログには、次のようなすべてのパイプライン イベントの包括的なレコードがキャプチャされます。
- 監査ログ。
- データ品質チェック。
- パイプラインの進行状況。
- データリネージ.
イベント ログは、すべての LakeFlow Declarative パイプラインで自動的に有効になり、次の方法でアクセスできます。
- パイプライン UI : ログを直接表示します。
- DLT API : プログラムによるアクセス。
- 直接クエリ : イベント ログ テーブルをクエリします。
詳細については、「LakeFlow 宣言型パイプラインのイベント ログ スキーマ」を参照してください。
クエリの例
これらのクエリ例は、バッチ期間、スループット、バックプレッシャー、リソース使用率などの主要なメトリクスを提供することで、パイプラインのパフォーマンスと正常性を監視するのに役立ちます。
平均バッチ期間
このクエリは、パイプラインによって処理されたバッチの平均期間を計算します。
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
平均スループット
このクエリは、パイプラインの平均スループットを 1 秒あたりの処理済み行数で計算します。
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
背圧
このクエリは、データ バックログを確認することで、パイプラインのバックプレッシャーを測定します。
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
クラスターとスロット使用率
このクエリには、パイプラインで使用されるクラスターまたはスロットの利用に関する知見があります。
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
ジョブ
ジョブ内のストリーミング クエリは、ストリーミング Query Listener を通じて監視できます。
リスナーを Spark セッションにアタッチして、Databricks のストリーミング クエリ リスナーを有効にします。このリスナーは、ストリーミングクエリの進行状況とメトリクスを監視します。 これは、メトリクスを外部モニタリングツールにプッシュしたり、さらに分析するためにログに記録したりするために使用できます。
例:メトリクスを外部モニタリングツールにエクスポートする
::: note
これは、Databricks Runtime 11.3 LTS 以降の Python と Scala で使用できます。
:::
StreamingQueryListener
インターフェイスを使用して、ストリーミング メトリクスを外部サービスにエクスポートして、アラートやダッシュボードを作成できます。
リスナーを実装する方法の基本的な例を次に示します。
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
例: Databricks 内でクエリ リスナーを使用する
Kafka から Delta Lake へのストリーミング クエリの StreamingQueryListener イベント ログの例を次に示します。
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
その他の例については、 例を参照してください。
クエリー進捗のメトリクス
クエリー進捗メトリクスは、ストリーミング クエリのパフォーマンスと正常性を監視するために不可欠です。 これらのメトリクスには、入力行の数、処理速度、およびクエリの実行に関連するさまざまな期間が含まれます。これらのメトリクスは、Spark セッションに StreamingQueryListener
をアタッチすることで確認できます。リスナーは、各ストリーミングエポックの終了時に、これらのメトリクスを含むイベントを発行します。
たとえば、リスナーの onQueryProgress
メソッドで StreamingQueryProgress.observedMetrics
マップを使用してメトリクスにアクセスできます。これにより、ストリーミングクエリのパフォーマンスをリアルタイムで追跡および分析できます。
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)