メインコンテンツまでスキップ

Databricksジョブ、LakeFlow 宣言型パイプライン、 の での可観測性LakeFlow Connect

ストリーミング アプリケーションのパフォーマンス、コスト、および正常性を監視することは、信頼性が高く効率的な ETL パイプラインを構築するために不可欠です。 Databricks は、ボトルネックの診断、パフォーマンスの最適化、リソースの使用量とコストの管理に役立つ、ジョブ、 LakeFlow 宣言型パイプライン、 LakeFlow Connect 全体で豊富な監視機能を提供します。

この記事では、次の領域のベスト プラクティスについて説明します。

  • 主要なストリーミングパフォーマンスメトリクス
  • イベントログスキーマとクエリ例
  • ストリーミング クエリーの モニタリング
  • システムテーブルを使用したコストオブザーバビリティ
  • ログとメトリクスの外部ツールへのエクスポート

ストリーミングの観測可能性における主要なメトリクス

ストリーミング パイプラインを運用する場合は、次の主要なメトリクスを監視してください。

メトリクス

目的

背圧

ファイル数とオフセット数(サイズ)を監視します。ボトルネックを特定し、システムが遅れることなく受信データを処理できるようにします。

スループット

マイクロバッチごとに処理されたメッセージの数を追跡します。パイプラインの効率を評価し、データ取り込みと歩調を合わせていることを確認します。

期間

マイクロバッチの平均期間を測定します。処理速度を示し、バッチ間隔の調整に役立ちます。

レイテンシー

時間の経過と共に処理されるレコード/メッセージの数を示します。エンドツーエンドのパイプライン遅延を理解し、レイテンシを短縮するために最適化するのに役立ちます。

クラスターの使用率

CPU とメモリの使用率 (%) を反映します。効率的なリソースの使用を確保し、処理要求に合わせてクラスターをスケーリングします。

ネットワーク

転送および受信したデータを測定します。ネットワークのボトルネックを特定し、データ転送パフォーマンスを向上させるのに役立ちます。

チェックポイント

処理されたデータとオフセットを識別します。一貫性を確保し、障害時のフォールトトレランスを可能にします。

コスト

ストリーミング アプリケーションの時間単位、日単位、月単位のコストが表示されます。予算編成とリソースの最適化を支援します。

依存関係

ストリーミング アプリケーションで作成されたデータセットとレイヤーを表示します。データ変換、追跡、品質保証、デバッグを容易にします。

クラスターログとメトリクス

Databricks クラスターログとメトリクスは、クラスターのパフォーマンスと使用率に関する詳細な知見を提供します。 これらのログとメトリクスには、CPU、メモリ、ディスク I/O、ネットワークトラフィック、およびその他のシステムメトリクスに関する情報が含まれます。これらのメトリクスを監視することは、クラスターのパフォーマンスを最適化し、リソースを効率的に管理し、問題のトラブルシューティングを行うために重要です。

Databricks クラスターログとメトリクスは、クラスターのパフォーマンスとリソース使用率に関する詳細な知見を提供します。 これには、CPU とメモリの使用量、ディスク I/O、およびネットワーク トラフィックが含まれます。これらのメトリクスの監視は、次の場合に重要です。

  • クラスターのパフォーマンスの最適化。
  • リソースを効率的に管理する。
  • 運用上の問題のトラブルシューティング。

メトリクスは、 Databricks UIを通じて活用したり、パーソナルモニタリングツールにエクスポートしたりできます。 ノートブックの例: Datadog メトリクスを参照してください。

Spark UI

Spark UI には、完了したタスク、保留中のタスク、失敗したタスクの数など、ジョブとステージの進行状況に関する詳細情報が表示されます。これにより、実行フローを理解し、ボトルネックを特定できます。

ストリーミング アプリケーションの場合、 ストリーミング タブ には、入力レート、処理レート、バッチ期間などのメトリクスが表示されます。 ストリーミング ジョブのパフォーマンスを監視し、データ取り込みや処理の問題を特定するのに役立ちます。

詳細については、「Apache Spark UI を使用したデバッグ」を参照してください。

コンピュート メトリクス

コンピュート メトリクスは、クラスターの利用状況を理解するのに役立ちます。 ジョブが実行されると、ジョブがどのようにスケーリングされ、リソースがどのように影響を受けるかを確認できます。OOM の失敗につながる可能性のあるメモリ負荷や、長い遅延を引き起こす可能性のある CPU 負荷を見つけることができます。ここでは、具体的なメトリクスをご紹介します。

  • サーバー負荷分散 :過去 1 分間の各ノードの CPU 使用率。
  • CPU 使用率 : CPU がさまざまなモード (ユーザー、システム、アイドル、iowait など) で費やした時間の割合。
  • メモリ使用率 : 各モードごとの合計メモリ使用量 (使用済み、空き、バッファ、キャッシュなど)。
  • メモリスワップ使用率 : メモリスワップの合計使用量。
  • ファイルシステムの空き容量 : 各マウントポイントによるファイルシステムの合計使用量。
  • ネットワークスループット :各デバイスがネットワークを介して送受信したバイト数。
  • アクティブノードの数 : 特定のコンピュートの各タイムスタンプにおけるアクティブノードの数。

詳細については、パフォーマンスの監視および ハードウェア メトリクス チャートを参照してください。

システムテーブル

コスト監視

Databricks システムテーブルは、ジョブのコストとパフォーマンスを監視するための構造化されたアプローチを提供します。これらのテーブルには、次のものが含まれます。

  • ジョブ実行の詳細。
  • リソース使用率。
  • 関連コスト。

これらの表を使用して、運用の正常性と財務への影響を理解します。

必要条件

コストモニタリングにシステムテーブルを使用するには:

  • アカウント管理者は、 system.lakeflow schemaを有効にする必要があります。
  • ユーザーは次のいずれかを行う必要があります。
    • メタストア管理者とアカウント管理者の両方である、または
    • システム スキーマに対する USE 権限と SELECT 権限を持っている。

クエリの例: 最もコストの高いジョブ (過去 30 日間)

このクエリは、過去 30 日間で最もコストの高いジョブを特定し、コスト分析と最適化を支援します。

SQL
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC

LakeFlow 宣言型パイプライン

LakeFlow 宣言型パイプライン イベント ログには、次のようなすべてのパイプライン イベントの包括的なレコードがキャプチャされます。

  • 監査ログ。
  • データ品質チェック。
  • パイプラインの進行状況。
  • データリネージ.

イベント ログは、すべての LakeFlow Declarative パイプラインで自動的に有効になり、次の方法でアクセスできます。

  • パイプライン UI : ログを直接表示します。
  • DLT API : プログラムによるアクセス。
  • 直接クエリ : イベント ログ テーブルをクエリします。

詳細については、「LakeFlow 宣言型パイプラインのイベント ログ スキーマ」を参照してください。

クエリの例

これらのクエリ例は、バッチ期間、スループット、バックプレッシャー、リソース使用率などの主要なメトリクスを提供することで、パイプラインのパフォーマンスと正常性を監視するのに役立ちます。

平均バッチ期間

このクエリは、パイプラインによって処理されたバッチの平均期間を計算します。

SQL
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc

平均スループット

このクエリは、パイプラインの平均スループットを 1 秒あたりの処理済み行数で計算します。

SQL
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc

背圧

このクエリは、データ バックログを確認することで、パイプラインのバックプレッシャーを測定します。

SQL
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'

クラスターとスロット使用率

このクエリには、パイプラインで使用されるクラスターまたはスロットの利用に関する知見があります。

SQL
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;

ジョブ

ジョブ内のストリーミング クエリは、ストリーミング Query Listener を通じて監視できます。

リスナーを Spark セッションにアタッチして、Databricks のストリーミング クエリ リスナーを有効にします。このリスナーは、ストリーミングクエリの進行状況とメトリクスを監視します。 これは、メトリクスを外部モニタリングツールにプッシュしたり、さらに分析するためにログに記録したりするために使用できます。

例:メトリクスを外部モニタリングツールにエクスポートする

::: note

これは、Databricks Runtime 11.3 LTS 以降の Python と Scala で使用できます。

:::

StreamingQueryListener インターフェイスを使用して、ストリーミング メトリクスを外部サービスにエクスポートして、アラートやダッシュボードを作成できます。

リスナーを実装する方法の基本的な例を次に示します。

Python
from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)

def onQueryProgress(self, event):
print("Query made progress: ", event.progress)

def onQueryTerminated(self, event):
print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

例: Databricks 内でクエリ リスナーを使用する

Kafka から Delta Lake へのストリーミング クエリの StreamingQueryListener イベント ログの例を次に示します。

JSON
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}

その他の例については、 を参照してください。

クエリー進捗のメトリクス

クエリー進捗メトリクスは、ストリーミング クエリのパフォーマンスと正常性を監視するために不可欠です。 これらのメトリクスには、入力行の数、処理速度、およびクエリの実行に関連するさまざまな期間が含まれます。これらのメトリクスは、Spark セッションに StreamingQueryListener をアタッチすることで確認できます。リスナーは、各ストリーミングエポックの終了時に、これらのメトリクスを含むイベントを発行します。

たとえば、リスナーの onQueryProgress メソッドで StreamingQueryProgress.observedMetrics マップを使用してメトリクスにアクセスできます。これにより、ストリーミングクエリのパフォーマンスをリアルタイムで追跡および分析できます。

Python
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)