Auto Loaderの監視と観察
Auto Loaderパイプラインは、バックログの増加、スキーマのドリフト、破損したデータ、ストリームの停止などの問題が下流の利用者に影響を与える前に検出するために、アクティブなモニタリングが必要です。このページでは、主要なメトリクスをモニタリングし、ファイルレベルの状態をクエリし、オブザーバビリティダッシュボードを構築し、一般的な問題をトラブルシューティングする方法について説明します。
本番運用の構成の詳細については、本番運用のワークロード用にAuto Loaderを構成するを参照してください。
前提条件
このページの一部のモニタリングワークフローは、cloud_files_state() を使用してファイルごとの取り込み状態(バックログクエリ、レイテンシ計算、スキーマドリフト検出など)を監視します。cloud_files_state() は、Auto Loader チェックポイントのファイルレベルの取り込み状態を返すテーブル値関数です。そのすべてのフィールドがデフォルトで利用できるわけではありません。可用性は、Databricks Runtime のバージョンと構成によって異なります:
- Databricks Runtime 18.2 以降 :
discovery_time、processed_time、およびcommit_timeは自動的に利用可能です。Databricks Runtime 16.4–18.1 では、これらのフィールドは、cloudFiles.cleanSourceが有効な場合にのみ利用可能です。 - **Databricks Runtime 16.4 以降で
cloudFiles.cleanSourceが有効な場合**:archive_time、archive_mode、およびmove_locationが利用可能です。
cloudFiles.cleanSourceを有効にすると、若干のパフォーマンスオーバーヘッドが発生します。本番運用環境で有効にする前に、本番運用前環境でワークロードに対してベンチマークを実施してください。
さらに:
- 取り込まれたデータに
_metadata列で注釈を付けます。file_pathとfile_modification_timeを最低限キャプチャします。ファイルメタデータ列を参照してください。 _rescued_data列と_corrupt_record列を有効にします。
主要なAuto Loaderメトリクス
以下の表は、Auto Loaderパイプラインで監視すべき最も重要なメトリクスをまとめたものです。これらのメトリクスはStreamingQueryListenerのプログレスイベントから利用でき、Auto Loader固有の値は各ソースのmetricsマップの下に公開されています。
メトリクス | 何がわかるか |
|---|---|
| 処理待ちのバックログ内のファイルの数 |
| ファイルのバックログサイズ(バイト) |
| クラウドキュー深度(ファイル通知モードのみ) |
| バッチあたりに処理された行数 |
| データの到着率 |
| 処理スループット |
| 各バッチでの所要時間 |
注意すべき点
次のパターンは、パイプラインに注意が必要であることを示しています。
numFilesOutstandingが増加中 :バックログが蓄積しています。お客様のパイプラインは入力データに遅れをとっています。processedRowsPerSecond** <inputRowsPerSecond**: パイプラインは、データの受信よりも処理が遅れています。- 大容量
durationMs.latestOffset:ファイルの検出が遅くなります。ファイルイベントへの切り替えを検討してください。 - Large
durationMs.addBatch:データ処理が遅いです。コンピュートのスケーリングまたは変換の最適化を検討してください。
完全なメトリクスリファレンスについては、Auto Loaderソースメトリクスを参照してください。
ファイルレベルの状態をクエリ cloud_files_state
cloud_files_state()テーブル値関数は、Auto Loaderによって検出された各ファイルに関する詳細な情報を提供します。次のフィールドが利用可能です。Databricks Runtime 16.4 以降または 18.2 以降を必要とするものとしてマークされているフィールドは、前提条件で説明されている条件でのみ入力されます。
フィールド | Type | 説明 |
|---|---|---|
|
| ファイルのパス |
|
| ファイルのサイズ(バイト単位) |
|
| ファイルが作成されたとき |
|
| Auto Loaderがファイルを検出したとき (Databricks Runtime 16.4以降) |
|
| Auto Loaderがファイルを処理したとき (Databricks Runtime 16.4 以降) |
|
| ファイルがチェックポイントにコミットされたとき (Databricks Runtime 16.4 以降) |
|
| ファイルがアーカイブされたとき ( |
|
|
|
|
|
|
|
| 現在のファイルの取り込み状態 |
ファイルの取り込み状態を調査する
次のクエリーは一般的な診断シナリオを対象としています。
すべての未処理ファイル(現在のバックログ)を検索:
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';
平均取り込みレイテンシーを計算(ファイル作成からコミットまでの時間):
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;
破損したファイルまたはスキップされたファイルを検索:
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';
アーカイブの進捗状況を追跡(cloudFiles.cleanSourceが必要です):
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;
検出からコミットまでのレイテンシーが高いファイルを特定して、ボトルネックを特定します:
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;
完全なSQLリファレンスについては、cloud_files_stateテーブル値関数を参照してください。
Lakeflow Spark宣言型パイプラインでAuto Loaderを監視する
Databricksは、本番運用Auto LoaderパイプラインにLakeflow Spark宣言型パイプラインを使用することをお勧めします。組み込みのモニタリング機能を活用するには:
-
Lakeflow Spark宣言型パイプラインのイベントログをDeltaテーブルに格納することで、オブザーバビリティデータのためにクエリできます。これをパイプラインの詳細設定またはAPIで構成します。詳細については、「パイプラインイベントログ」を参照してください。
-
可観測性を考慮してパイプラインを構築してください。Lakeflow Spark宣言型パイプラインにおける適切に構造化されたAuto Loaderパイプラインには、
{table}_sourceビュー(Auto Loaderソース定義)、_rescued_dataおよび_corrupt_record列を含む生データ取り込みのための{table}_bronzeストリーミングテーブル、解析できないデータを含む行を隔離するcorrupt_records_sink、そしてダウンストリームでの利用のための{table}クリーンビューが含まれます。 -
ブロンズストリーミングテーブルに期待値を設定し、スキーマドリフトとデータ破損を監視します。
_rescued_data IS NULLは予期しないスキーマの変更を検出し、_corrupt_record IS NULLは解析できないデータを検出します。データが到着すると、Lakeflow Spark宣言型パイプラインはこれらの期待値を評価し、可観測性トレイルを生成します。期待値を構成して、警告したり、行を削除したり、パイプラインを失敗させたりできます。
パイプライン用にevent_log_rawビューを作成したら、Auto Loader固有のメトリクスについて、次のクエリーを使用してください。
フローごとの取り込みスループットを監視する:
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;
フローごとのデータバックログを監視します:
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;
スキーマのドリフトと破損したデータを検出するために、期待される違反を要約します:
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;
Lakeflow Spark宣言型パイプラインの一般的なモニタリングガイダンスについては、「パイプライン監視」および「パイプラインイベントログ」を参照してください。
構造化ストリーミングでAuto Loaderを監視
LakeFlow Spark宣言型パイプラインの外部でAuto Loaderを実行する場合は、次の構造化ストリーミングのモニタリングアプローチを使用します。
StreamingQueryListenerを実装して、source.metricsから読み取ることで、各バッチから Auto Loader 固有のメトリクスをキャプチャします。
from pyspark.sql.streaming import StreamingQueryListener
class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)
def onQueryIdle(self, event):
pass
def onQueryTerminated(self, event):
pass
spark.streams.addListener(AutoLoaderMonitor())
リスナーの処理ロジックはクエリ処理を遅らせる可能性があります。リスナーコールバックでの計算を制限し、そこで同期的な外部書き込みを回避してください。代わりに、軽量なテレメトリを非同期で出力するか、永続化のためにメトリクスを別のジョブに渡してください。
-
ソースの進行状況から
numInputRows、inputRowsPerSecond、processedRowsPerSecondを使用して、バッチごとのスループット (1秒あたりのファイル数および1秒あたりの行数) をコンピュートします。 -
取り込みレイテンシーを計算するには、
cloud_files_state()のcreate_timeとcommit_timeを比較してエンドツーエンドのレイテンシーを測定します。処理レイテンシーの場合、durationMsの内訳(たとえば、latestOffset、addBatch、およびその他の報告されたバッチフェーズ)を使用して、ボトルネックとなっているステージを特定します。 -
df.observe()を使用して、ストリーミングDataFrame上でインラインデータ品質メトリクスを直接定義します。メトリクスは、observedMetricsの下にあるStreamingQueryListenerの進行状況イベントで表示されます。
from pyspark.sql.functions import count, lit, col
observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
.queryName()を使用して各ストリームに一意の名前を割り当てると、Spark UIのストリーミングタブやモニタリングダッシュボードでAuto Loaderストリームを区別しやすくなります。
構造化ストリーミングのモニタリングに関する完全なリファレンスについては、「Databricksにおける構造化ストリーミングクエリのモニタリング」を参照してください。
可観測性ダッシュボードを構築します。
複数のソースのデータを組み合わせて、Auto Loader パイプラインの包括的なオブザーバビリティ ダッシュボードを構築します。このテーブルには、オブザーバビリティ ダッシュボードの構成に使用できる推奨ソースがいくつか表示されます。
データソース | 可観測性データ |
|---|---|
| ファイルレベルの取り込み状態:検出、処理、コミット、およびファイルごとのアーカイブタイムスタンプ |
LakeFlow Spark宣言型パイプライン イベントログ | パイプライン実行履歴、バッチごとのフローメトリクス、データ品質エクスペクテーションの結果 |
パイプライン出力テーブル | 取り込み済みテーブルごとに書き込まれた行数とデータ量 |
その後、オブザーバビリティデータを、ダッシュボードやアラートの基盤となる専用のテーブルに集約することができます。
event_type = 'update_progress'イベントから導き出されたパイプライン実行ステータス (成功または失敗) を、時系列で要約します。cloud_files_state()とevent_type = 'flow_progress'のイベントから派生した、ファイル取り込みメトリクス(バックログサイズ、スループット、バッチあたりのレイテンシー)を集計します。- イベントログ内の
num_output_rowsから導出される、テーブルごとの行数とデータ量を使用してテーブル統計を作成します。 data_qualityが入力されたevent_type = 'flow_progress'イベントから派生した、詳細なエラーログと更新ごとの予期される違反からデバッグ情報を収集します。
これらの集計テーブルは、AI/BI dashboard と SQL アラートの基盤として使用できます。推奨されるダッシュボード パネルには、パイプラインの実行ステータス タイムライン、取り込みバックログの傾向、スループットの傾向、取り込みレイテンシーの分布、データ品質メトリクス、スキーマ進化イベント、およびファイル アーカイブのステータスが含まれます。
スキーマ進化イベントの監視
スキーマ変更が発生時に検出するには、次のアプローチを使用します。
_rescued_dataの期待値違反カウントにおける非NULL値は、スキーマドリフトを示します。イベントログでno rescued dataの期待値におけるfailed_records > 0をクエリしてください。- 設定された
cloudFiles.schemaLocation内の_schemasディレクトリ (またはスキーマの場所が個別に設定されていない場合のみチェックポイント内) の変更は、スキーマ進化が発生したことを示しています。別のモニタリングジョブからこのディレクトリをポーリングできます。 - 同じストリーム名に対して、
onQueryStartedが続くonQueryTerminatedイベントを、それだけではスキーマ進化の十分な証拠と見なさないでください。ストリームは、多くの理由 (クラスターの再起動、コードのデプロイ、一時的なストレージエラーなど) で再起動します。スキーマ進化が発生したと結論付ける前に、再起動を独立したシグナル —_schemasディレクトリの変更または_rescued_dataエクスペクテーション違反 — と関連付けてください。 _metadata.file_pathを使用して、どのファイルがスキーマ変更を導入したかを特定します。これをpathフィールドでcloud_files_state()と結合し、スキーマ変更を特定のファイルとバッチに関連付けます。
このサンプル クエリを使用して、期待違反を通じて最近のスキーマ ドリフトを検出します:
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;
一般的な問題のアラートを設定する
Databricks SQLアラートまたはパイプライン通知を使用して、ダウンストリームのコンシューマーに影響を与える前に問題を検出します。
次の SQL は、増加するバックログを検出し、Databricks SQL アラートの基盤として使用できます。定期的に(たとえば、5分ごとに)実行するようにスケジュール設定し、結果が空でない場合にアラートします。
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB
次の表に、推奨されるアラート条件をまとめます。
検出する内容 | 検出方法 | アラートのタイミング |
|---|---|---|
バックログが増加しています |
| 複数のバッチにわたる持続的な増加 |
停止中のストリーム | 進捗イベントはありません | N分間イベントなし(予想されるトリガー間隔に基づく) |
高い取り込みレイテンシー |
| SLAしきい値を超えています。 |
データ品質の低下 | 期待失敗率 | 期待に反する行の割合が増加しています。 |
スキーマ進化イベント |
| 期待違反カウントにおける空値以外の値 |
ファイルディスカバリーの遅延 |
| ベースラインより大幅に高い |
一般的な問題のトラブルシューティング
次の表は、一般的なAuto Loaderパイプラインの問題、その考えられる原因、およびそれらを解決するための推奨されるアクションを示しています。
問題 | 考えられる原因 | 推奨アクション |
|---|---|---|
バックログが処理よりも速く増加しています。 | コンピュートのサイズ不足、データスキュー、またはスロットルされたレート制限 | コンピュートをスケーリングし、Spark UI でスキューを確認し、バッチサイズを制御するための |
ファイルが検出されていません | ファイルイベントの設定ミス、権限の問題、またはストリームが7日以内に実行されていません。 | 外部ロケーションのアクセス許可を確認し、Unity Catalog UIでファイルイベントの設定を確認し、RocksDBの状態の期限切れを回避するためにストリームが少なくとも7日ごとに実行されるようにしてください。 |
ストリームの起動に時間がかかりすぎます | 大規模なチェックポイント状態のダウンロード(RocksDB) | Databricks Runtime 15.3 以降にアップグレードすると、非同期状態読み込みをご利用いただけるようになり、起動時間を約90%短縮できます。 |
重複ファイルの処理 | 積極的な | 保守的な |
スキーマ進化によるパイプラインの再起動 | 頻繁な、または互換性のないスキーマ変更 |
|
シンクに蓄積されている破損データ | ソースデータの品質に関する問題 |
|
| Databricks Runtime 18.2未満で実行(なし) | Databricks Runtime 18.2以降にアップグレードするか、Databricks Runtime 16.4~18.1で |
さらにトラブルシューティングを行う場合は、「Auto Loader FAQ」を参照してください。