メインコンテンツまでスキップ

Auto Loaderの監視と観察

Auto Loaderパイプラインは、バックログの増加、スキーマのドリフト、破損したデータ、ストリームの停止などの問題が下流の利用者に影響を与える前に検出するために、アクティブなモニタリングが必要です。このページでは、主要なメトリクスをモニタリングし、ファイルレベルの状態をクエリし、オブザーバビリティダッシュボードを構築し、一般的な問題をトラブルシューティングする方法について説明します。

本番運用の構成の詳細については、本番運用のワークロード用にAuto Loaderを構成するを参照してください。

前提条件

このページの一部のモニタリングワークフローは、cloud_files_state() を使用してファイルごとの取り込み状態(バックログクエリ、レイテンシ計算、スキーマドリフト検出など)を監視します。cloud_files_state() は、Auto Loader チェックポイントのファイルレベルの取り込み状態を返すテーブル値関数です。そのすべてのフィールドがデフォルトで利用できるわけではありません。可用性は、Databricks Runtime のバージョンと構成によって異なります:

  • Databricks Runtime 18.2 以降discovery_timeprocessed_time、およびcommit_timeは自動的に利用可能です。Databricks Runtime 16.4–18.1 では、これらのフィールドは、cloudFiles.cleanSourceが有効な場合にのみ利用可能です。
  • **Databricks Runtime 16.4 以降でcloudFiles.cleanSource が有効な場合**:archive_timearchive_mode 、およびmove_location が利用可能です。

cloudFiles.cleanSourceを有効にすると、若干のパフォーマンスオーバーヘッドが発生します。本番運用環境で有効にする前に、本番運用前環境でワークロードに対してベンチマークを実施してください。

さらに:

  • 取り込まれたデータに_metadata列で注釈を付けます。file_pathfile_modification_time を最低限キャプチャします。ファイルメタデータ列を参照してください。
  • _rescued_data列と_corrupt_record列を有効にします。

主要なAuto Loaderメトリクス

以下の表は、Auto Loaderパイプラインで監視すべき最も重要なメトリクスをまとめたものです。これらのメトリクスはStreamingQueryListenerのプログレスイベントから利用でき、Auto Loader固有の値は各ソースのmetricsマップの下に公開されています。

メトリクス

何がわかるか

numFilesOutstanding

処理待ちのバックログ内のファイルの数

numBytesOutstanding

ファイルのバックログサイズ(バイト)

approximateQueueSize

クラウドキュー深度(ファイル通知モードのみ)

numInputRows

バッチあたりに処理された行数

inputRowsPerSecond

データの到着率

processedRowsPerSecond

処理スループット

durationMs 内訳

各バッチでの所要時間

注意すべき点

次のパターンは、パイプラインに注意が必要であることを示しています。

  • numFilesOutstandingが増加中 :バックログが蓄積しています。お客様のパイプラインは入力データに遅れをとっています。
  • processedRowsPerSecond** <inputRowsPerSecond **: パイプラインは、データの受信よりも処理が遅れています。
  • 大容量durationMs.latestOffset :ファイルの検出が遅くなります。ファイルイベントへの切り替えを検討してください。
  • Large durationMs.addBatch :データ処理が遅いです。コンピュートのスケーリングまたは変換の最適化を検討してください。

完全なメトリクスリファレンスについては、Auto Loaderソースメトリクスを参照してください。

ファイルレベルの状態をクエリ cloud_files_state

cloud_files_state()テーブル値関数は、Auto Loaderによって検出された各ファイルに関する詳細な情報を提供します。次のフィールドが利用可能です。Databricks Runtime 16.4 以降または 18.2 以降を必要とするものとしてマークされているフィールドは、前提条件で説明されている条件でのみ入力されます。

フィールド

Type

説明

path

STRING

ファイルのパス

size

BIGINT

ファイルのサイズ(バイト単位)

create_time

TIMESTAMP

ファイルが作成されたとき

discovery_time

TIMESTAMP

Auto Loaderがファイルを検出したとき (Databricks Runtime 16.4以降)

processed_time

TIMESTAMP

Auto Loaderがファイルを処理したとき (Databricks Runtime 16.4 以降)

commit_time

TIMESTAMP

ファイルがチェックポイントにコミットされたとき (Databricks Runtime 16.4 以降)

archive_time

TIMESTAMP

ファイルがアーカイブされたとき (cloudFiles.cleanSource が必要)

archive_mode

STRING

MOVEDELETE、またはNULLcloudFiles.cleanSourceが必要です)

move_location

STRING

cloudFiles.cleanSourceの場合の送信先パス MOVE

ingestion_state

STRING

現在のファイルの取り込み状態

ファイルの取り込み状態を調査する

次のクエリーは一般的な診断シナリオを対象としています。

すべての未処理ファイル(現在のバックログ)を検索:

SQL
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

平均取り込みレイテンシーを計算(ファイル作成からコミットまでの時間):

SQL
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

破損したファイルまたはスキップされたファイルを検索:

SQL
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

アーカイブの進捗状況を追跡(cloudFiles.cleanSourceが必要です):

SQL
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

検出からコミットまでのレイテンシーが高いファイルを特定して、ボトルネックを特定します:

SQL
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

完全なSQLリファレンスについては、cloud_files_stateテーブル値関数を参照してください。

Lakeflow Spark宣言型パイプラインでAuto Loaderを監視する

Databricksは、本番運用Auto LoaderパイプラインにLakeflow Spark宣言型パイプラインを使用することをお勧めします。組み込みのモニタリング機能を活用するには:

  • Lakeflow Spark宣言型パイプラインのイベントログをDeltaテーブルに格納することで、オブザーバビリティデータのためにクエリできます。これをパイプラインの詳細設定またはAPIで構成します。詳細については、「パイプラインイベントログ」を参照してください。

  • 可観測性を考慮してパイプラインを構築してください。Lakeflow Spark宣言型パイプラインにおける適切に構造化されたAuto Loaderパイプラインには、{table}_sourceビュー(Auto Loaderソース定義)、_rescued_dataおよび_corrupt_record列を含む生データ取り込みのための{table}_bronzeストリーミングテーブル、解析できないデータを含む行を隔離するcorrupt_records_sink、そしてダウンストリームでの利用のための{table}クリーンビューが含まれます。

  • ブロンズストリーミングテーブルに期待値を設定し、スキーマドリフトとデータ破損を監視します。_rescued_data IS NULLは予期しないスキーマの変更を検出し、_corrupt_record IS NULLは解析できないデータを検出します。データが到着すると、Lakeflow Spark宣言型パイプラインはこれらの期待値を評価し、可観測性トレイルを生成します。期待値を構成して、警告したり、行を削除したり、パイプラインを失敗させたりできます。

パイプライン用にevent_log_rawビューを作成したら、Auto Loader固有のメトリクスについて、次のクエリーを使用してください。

フローごとの取り込みスループットを監視する:

SQL
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

フローごとのデータバックログを監視します:

SQL
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

スキーマのドリフトと破損したデータを検出するために、期待される違反を要約します:

SQL
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;

Lakeflow Spark宣言型パイプラインの一般的なモニタリングガイダンスについては、「パイプライン監視」および「パイプラインイベントログ」を参照してください。

構造化ストリーミングでAuto Loaderを監視

LakeFlow Spark宣言型パイプラインの外部でAuto Loaderを実行する場合は、次の構造化ストリーミングのモニタリングアプローチを使用します。

  • StreamingQueryListener を実装して、source.metrics から読み取ることで、各バッチから Auto Loader 固有のメトリクスをキャプチャします。
Python
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass

def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)

def onQueryIdle(self, event):
pass

def onQueryTerminated(self, event):
pass

spark.streams.addListener(AutoLoaderMonitor())
注記

リスナーの処理ロジックはクエリ処理を遅らせる可能性があります。リスナーコールバックでの計算を制限し、そこで同期的な外部書き込みを回避してください。代わりに、軽量なテレメトリを非同期で出力するか、永続化のためにメトリクスを別のジョブに渡してください。

  • ソースの進行状況からnumInputRowsinputRowsPerSecondprocessedRowsPerSecondを使用して、バッチごとのスループット (1秒あたりのファイル数および1秒あたりの行数) をコンピュートします。

  • 取り込みレイテンシーを計算するには、cloud_files_state()create_timecommit_timeを比較してエンドツーエンドのレイテンシーを測定します。処理レイテンシーの場合、durationMsの内訳(たとえば、latestOffsetaddBatch、およびその他の報告されたバッチフェーズ)を使用して、ボトルネックとなっているステージを特定します。

  • df.observe()を使用して、ストリーミングDataFrame上でインラインデータ品質メトリクスを直接定義します。メトリクスは、observedMetricsの下にあるStreamingQueryListenerの進行状況イベントで表示されます。

Python
from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
  • .queryName() を使用して各ストリームに一意の名前を割り当てると、Spark UIのストリーミングタブやモニタリングダッシュボードでAuto Loaderストリームを区別しやすくなります。

構造化ストリーミングのモニタリングに関する完全なリファレンスについては、「Databricksにおける構造化ストリーミングクエリのモニタリング」を参照してください。

可観測性ダッシュボードを構築します。

複数のソースのデータを組み合わせて、Auto Loader パイプラインの包括的なオブザーバビリティ ダッシュボードを構築します。このテーブルには、オブザーバビリティ ダッシュボードの構成に使用できる推奨ソースがいくつか表示されます。

データソース

可観測性データ

cloud_files_state()

ファイルレベルの取り込み状態:検出、処理、コミット、およびファイルごとのアーカイブタイムスタンプ

LakeFlow Spark宣言型パイプライン イベントログ

パイプライン実行履歴、バッチごとのフローメトリクス、データ品質エクスペクテーションの結果

パイプライン出力テーブル

取り込み済みテーブルごとに書き込まれた行数とデータ量

その後、オブザーバビリティデータを、ダッシュボードやアラートの基盤となる専用のテーブルに集約することができます。

  • event_type = 'update_progress' イベントから導き出されたパイプライン実行ステータス (成功または失敗) を、時系列で要約します。
  • cloud_files_state()event_type = 'flow_progress'のイベントから派生した、ファイル取り込みメトリクス(バックログサイズ、スループット、バッチあたりのレイテンシー)を集計します。
  • イベントログ内の num_output_rows から導出される、テーブルごとの行数とデータ量を使用してテーブル統計を作成します。
  • data_qualityが入力されたevent_type = 'flow_progress'イベントから派生した、詳細なエラーログと更新ごとの予期される違反からデバッグ情報を収集します。

これらの集計テーブルは、AI/BI dashboard と SQL アラートの基盤として使用できます。推奨されるダッシュボード パネルには、パイプラインの実行ステータス タイムライン、取り込みバックログの傾向、スループットの傾向、取り込みレイテンシーの分布、データ品質メトリクス、スキーマ進化イベント、およびファイル アーカイブのステータスが含まれます。

スキーマ進化イベントの監視

スキーマ変更が発生時に検出するには、次のアプローチを使用します。

  • _rescued_dataの期待値違反カウントにおける非NULL値は、スキーマドリフトを示します。イベントログでno rescued dataの期待値におけるfailed_records > 0をクエリしてください。
  • 設定されたcloudFiles.schemaLocation内の_schemasディレクトリ (またはスキーマの場所が個別に設定されていない場合のみチェックポイント内) の変更は、スキーマ進化が発生したことを示しています。別のモニタリングジョブからこのディレクトリをポーリングできます。
  • 同じストリーム名に対して、onQueryStarted が続く onQueryTerminated イベントを、それだけではスキーマ進化の十分な証拠と見なさないでください。ストリームは、多くの理由 (クラスターの再起動、コードのデプロイ、一時的なストレージエラーなど) で再起動します。スキーマ進化が発生したと結論付ける前に、再起動を独立したシグナル — _schemas ディレクトリの変更または _rescued_data エクスペクテーション違反 — と関連付けてください。
  • _metadata.file_path を使用して、どのファイルがスキーマ変更を導入したかを特定します。これをpathフィールドでcloud_files_state()と結合し、スキーマ変更を特定のファイルとバッチに関連付けます。

このサンプル クエリを使用して、期待違反を通じて最近のスキーマ ドリフトを検出します:

SQL
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;

一般的な問題のアラートを設定する

Databricks SQLアラートまたはパイプライン通知を使用して、ダウンストリームのコンシューマーに影響を与える前に問題を検出します。

次の SQL は、増加するバックログを検出し、Databricks SQL アラートの基盤として使用できます。定期的に(たとえば、5分ごとに)実行するようにスケジュール設定し、結果が空でない場合にアラートします。

SQL
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB

次の表に、推奨されるアラート条件をまとめます。

検出する内容

検出方法

アラートのタイミング

バックログが増加しています

numFilesOutstanding 上昇傾向

複数のバッチにわたる持続的な増加

停止中のストリーム

進捗イベントはありません

N分間イベントなし(予想されるトリガー間隔に基づく)

高い取り込みレイテンシー

commit_time - create_time

SLAしきい値を超えています。

データ品質の低下

期待失敗率

期待に反する行の割合が増加しています。

スキーマ進化イベント

_rescued_data IS NOT NULL

期待違反カウントにおける空値以外の値

ファイルディスカバリーの遅延

durationMs.latestOffset

ベースラインより大幅に高い

一般的な問題のトラブルシューティング

次の表は、一般的なAuto Loaderパイプラインの問題、その考えられる原因、およびそれらを解決するための推奨されるアクションを示しています。

問題

考えられる原因

推奨アクション

バックログが処理よりも速く増加しています。

コンピュートのサイズ不足、データスキュー、またはスロットルされたレート制限

コンピュートをスケーリングし、Spark UI でスキューを確認し、バッチサイズを制御するための maxFilesPerTrigger 設定をレビューします。

ファイルが検出されていません

ファイルイベントの設定ミス、権限の問題、またはストリームが7日以内に実行されていません。

外部ロケーションのアクセス許可を確認し、Unity Catalog UIでファイルイベントの設定を確認し、RocksDBの状態の期限切れを回避するためにストリームが少なくとも7日ごとに実行されるようにしてください。

ストリームの起動に時間がかかりすぎます

大規模なチェックポイント状態のダウンロード(RocksDB)

Databricks Runtime 15.3 以降にアップグレードすると、非同期状態読み込みをご利用いただけるようになり、起動時間を約90%短縮できます。

重複ファイルの処理

積極的なcloudFiles.maxFileAge設定、またはチェックポイントの破損

保守的な maxFileAge (最低90日以上) を使用し、チェックポイントの整合性を検証し、チェックポイントストレージでのライフサイクルポリシーを避けてください。

スキーマ進化によるパイプラインの再起動

頻繁な、または互換性のないスキーマ変更

schemaEvolutionModeを確認し、タイププロモーションにはaddNewColumnsWithTypeWideningに切り替えるか、非常に動的なスキーマにはバリアントタイプを使用してください。

シンクに蓄積されている破損データ

ソースデータの品質に関する問題

_corrupt_record の検疫シンクでパターンを確認し、ソースデータの生成をレビューして、アップストリームの検証を追加することを検討してください。

discovery_time および commit_time が入力されていません

Databricks Runtime 18.2未満で実行(なし) cleanSource

Databricks Runtime 18.2以降にアップグレードするか、Databricks Runtime 16.4~18.1でcloudFiles.cleanSourceを有効にしてください。

さらにトラブルシューティングを行う場合は、「Auto Loader FAQ」を参照してください。