Monitor DLT パイプライン
この記事では、DLT パイプラインの組み込み モニタリング機能と可観測性機能の使用について説明します。 これらの機能は、次のようなタスクをサポートします。
- パイプラインの更新の進行状況とステータスを監視します。 「UI で使用できるパイプラインの詳細」を参照してください。
- パイプライン更新の成功や失敗などのパイプライン イベントに関するアラート。 「パイプラインイベントのEメール通知の追加」を参照してください。
- Viewing メトリクス for ストリーミング ソース like Apache Kafka and Auto Loader (Public Preview). 「ストリーミングメトリクスの表示」を参照してください。
- データリネージ、データ品質メトリクス、リソース使用量などのパイプライン更新に関する詳細な情報を抽出します。 DLT イベント ログとはを参照してください。
- 特定のイベントが発生したときに実行するカスタムアクションを定義します。 「イベント フックを使用した DLT パイプラインのカスタム モニタリングの定義」を参照してください。
クエリのパフォーマンスを検査および診断するには、「 DLT パイプラインのクエリ履歴にアクセスする」を参照してください。この機能はパブリック プレビュー段階です。
パイプラインイベントのEメール通知を追加する
1 つ以上の Eメール アドレスを設定して、次の場合に通知を受け取ることができます。
- パイプラインの更新が正常に完了しました。
- パイプラインの更新が失敗し、再試行可能または再試行不可能なエラーが発生します。 このオプションを選択すると、すべてのパイプラインの失敗に関する通知を受け取ります。
- パイプラインの更新が再試行不可 (致命的) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生した場合にのみ通知を受け取ります。
- 1 つのデータ フローが失敗します。
パイプライン を作成 または編集するときに Eメール 通知を設定するには:
- [ 通知の追加 ] をクリックします。
- 通知を受け取る 1 つ以上の Eメール アドレスを入力します。
- 各通知タイプのチェックボックスをクリックして、設定したEメールアドレスに送信します。
- [ 通知の追加 ] をクリックします。
UI ではどのようなパイプラインの詳細を使用できますか?
パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 デフォルトでは、パイプラインの詳細ページにはテーブルの最新の更新が表示されますが、ドロップダウンメニューから古い更新を選択することもできます。
詳細には、パイプライン ID、ソース コード、コンピュート コスト、製品エディション、およびパイプライン用に構成されたチャンネルが含まれます。
データセットの表形式ビューを表示するには、[ リスト ] タブをクリックします。 リスト ビューでは、パイプライン内のすべてのデータセットをテーブルの行として表示でき、パイプラインの DAG が大きすぎて グラフ ビューで視覚化できない場合に便利です。テーブルに表示されるデータセットは、データセット名、タイプ、ステータスなどの複数のフィルターを使用して制御できます。 DAG ビジュアライゼーションに戻すには、[ グラフ ] をクリックします。
実行者ユーザーは パイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。run as
ユーザーを変更するには、[ アクセス許可 ] をクリックし、パイプラインの所有者を変更します。
データセットの詳細を表示するにはどうすればよいですか?
パイプライン グラフまたはデータセット リストでデータセットをクリックすると、データセットの詳細が表示されます。 詳細には、データセット スキーマ、データ品質メトリクス、データセットを定義するソースコードへのリンクが含まれます。
更新履歴の表示
パイプラインの更新の履歴とステータスを表示するには、上部のバーにある更新履歴のドロップダウンメニューをクリックします。
ドロップダウンメニューで更新を選択すると、更新のグラフ、詳細、イベントが表示されます。 最新の更新プログラムに戻すには、[ 最新の更新プログラムを表示 ] をクリックします。
ストリーミングメトリクスの表示
プレビュー
DLT のストリーミング可観測性は パブリック プレビュー段階です。
DLT パイプラインの各ストリーミングフローについて、 Spark 構造化ストリーミングでサポートされているデータソース ( Apache Kafka、 Amazon Kinesis、 Auto Loaderテーブル、 Delta テーブルなど) からストリーミング メトリクスを表示できます。 メトリクスは、DLT UI の右ペインにグラフとして表示され、バックログ秒数、バックログバイト数、バックログレコード、バックログファイルが含まれます。 グラフには分単位で集計された最大値が表示され、グラフにカーソルを合わせるとツールチップに最大値が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。
ストリーミングメトリクスが利用可能なパイプライン内のテーブルでは、UI Graph ビューでパイプライン DAG を表示すると、 アイコンが表示されます。ストリーミング メトリクスを表示するには、
をクリックして、右ペインの [フロー ] タブにストリーミング メトリクス チャートを表示します。 また、ストリーミング メトリクスのあるテーブルのみを表示するフィルターを適用するには、[ List ] をクリックし、[ Has ストリーミング メトリクス ] をクリックします。
各ストリーミング ソースは、特定のメトリクスのみをサポートします。 ストリーミングソースでサポートされていないメトリクスは、UIで表示できません。 次の表は、サポートされているストリーミング ソースで使用できるメトリクスを示しています。
ソース | バックログバイト | バックログ レコード | バックログ秒数 | バックログ ファイル |
---|---|---|---|---|
Kafka | ✓ | ✓ | ||
Kinesis | ✓ | ✓ | ||
Delta | ✓ | ✓ | ||
Auto Loader | ✓ | ✓ | ||
Google Pub/Sub | ✓ | ✓ |
DLT イベント ログとは何ですか?
DLT イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。 イベント ログを使用して、データ パイプラインの状態を追跡、理解、および監視できます。
イベント ログ エントリは、DLT ユーザー インターフェイス、DLT API で表示するか、イベント ログを直接クエリして表示できます。このセクションでは、イベント ログを直接クエリする方法に焦点を当てます。
また、イベントがログに記録されたときに実行するカスタムアクション (アラートの送信など) をイベント フックで定義することもできます。
イベント ログ、またはイベント ログが公開されている親カタログまたはスキーマは削除しないでください。 イベント ログを削除すると、今後の実行中にパイプラインの更新が失敗する可能性があります。
イベント ログ スキーマ
次の表では、イベント ログ スキーマについて説明します。 これらのフィールドの一部には、 details
フィールドなど、一部のクエリを実行するために解析が必要な JSON データが含まれています。 Databricks では、JSON フィールドを解析するための :
演算子がサポートされています。 :
(コロン記号) 演算子を参照してください。
フィールド | 説明 |
---|---|
| イベント・ログ・レコードのユニークな識別子。 |
| イベントを識別して順序付けするためのメタデータを含む JSON ドキュメント。 |
| イベントの起点のメタデータを含む JSON ドキュメント (クラウド プロバイダー、クラウド プロバイダーのリージョン、 |
| イベントが記録された時刻。 |
| イベントを説明する人間が判読できるメッセージ。 |
| イベント タイプ ( |
| イベントスキーマの安定性。 可能な値は次のとおりです。 - |
| エラーが発生した場合は、エラーを説明する詳細。 |
| イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用される主要なフィールドです。 |
| イベントの種類。 |
イベント ログのクエリ
このセクションでは、 Unity Catalog とデフォルトの公開モードで構成されたパイプラインのイベント ログを操作するためのデフォルトの動作と構文について説明します。
- レガシ パブリッシング モードを使用する Unity Catalog パイプラインの動作については、「 レガシ パブリッシング モード パイプラインの Unity Catalog イベント ログの操作」を参照してください。
- Hive metastore パイプラインの動作と構文については、「Hive metastore パイプラインのイベント ログの操作」を参照してください。
デフォルトにより、DLT は、パイプライン用に構成されたデフォルト カタログとスキーマの非表示の Delta テーブルにイベント ログを書き込みます。 非表示になっている間も、十分に権限のあるすべてのユーザーがテーブルをクエリできます。 デフォルトでは、パイプラインの所有者のみがイベント ログ テーブルをクエリできます。
デフォルトでは、非表示のイベントログの名前は event_log_{pipeline_id}
という形式で、パイプライン ID はシステムによって割り当てられた UUID で、ダッシュはアンダースコアに置き換えられています。
JSON 構成を操作して、イベント ログを公開できます。 イベント ログを発行するときは、イベント ログの名前を指定し、次の例のようにカタログとスキーマをオプションで指定できます。
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
イベント ログの場所は、パイプライン内の Auto Loader クエリのスキーマの場所としても機能します。 Databricks 、権限を変更する前にイベント ログ テーブルに対してビューを作成することをお勧めします。これは、イベント ログ テーブルが直接共有されている場合、一部のコンピュート設定でユーザーがスキーマ メタデータにアクセスできる可能性があるためです。 次の構文例では、イベント ログ テーブルにビューを作成し、この記事に含まれるイベント ログ クエリの例で使用します。
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
パイプライン実行の各インスタンスは 、更新 と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出したい場合があります。 次のクエリを実行して、最新の更新プログラムの識別子を検索し、 latest_update
一時ビューに保存します。 このビューは、この記事に含まれるイベント ログ クエリの例で使用されます。
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Unity Catalog では、ビューはストリーミング クエリをサポートします。 次の例では、構造化ストリーミングを使用して、イベント ログ テーブルの上に定義されたビューをクエリします。
df = spark.readStream.table("event_log_raw")
パイプラインの所有者は、パイプライン構成の [詳細設定 ] セクションで Publish event log to metastore
オプションを切り替えることで、イベント ログをパブリック Delta テーブルとして発行できます。オプションで、イベント・ログの新しいテーブル名、カタログ、およびスキーマを指定できます。
Query リネージ 情報をイベント ログから取得
リネージに関する情報を含むイベントのイベント・タイプは flow_definition
です。 details:flow_definition
オブジェクトには、グラフ内の各リレーションシップを定義する output_dataset
と input_datasets
が含まれています。
次のクエリを使用して、リネージ情報を表示するための入力データセットと出力データセットを抽出できます。
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
|
|
---|---|
|
|
|
|
|
|
|
|
イベント ログからのデータ品質のクエリ
パイプライン内のデータセットに対するエクスペクテーションを定義すると、データ品質メトリクスは details:flow_progress.data_quality.expectations
オブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント・タイプは flow_progress
です。 次の例では、最後のパイプライン更新のデータ品質メトリクスをクエリーします。
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
|
|
|
|
---|---|---|---|
|
| 4083 | 0 |
イベント ログからの Auto Loader イベントのクエリ
DLT は、 Auto Loader がファイルを処理するときにイベントを生成します。 Auto Loaderイベントの場合、event_type
はoperation_progress
で、details:operation_progress:type
はAUTO_LOADER_LISTING
またはAUTO_LOADER_BACKFILL
です。details:operation_progress
オブジェクトには、status
、 duration_ms
、 auto_loader_details:source_path
、 auto_loader_details:num_files_listed
フィールドも含まれます。
次の例では、最新の更新の Auto Loader イベントをクエリします。
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')
イベント ログをクエリしてデータ バックログを監視する
DLT は、 details:flow_progress.metrics.backlog_bytes
オブジェクトのバックログに存在するデータの量を追跡します。バックログメトリクスを含むイベントのイベントタイプは flow_progress
です。 次の例では、最後のパイプライン更新のバックログ メトリクスをクエリします。
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
backlog メトリクスは、パイプラインのデータソースタイプと Databricks Runtime バージョンによっては利用できない場合があります。
サーバレスが有効になっていないパイプラインのイベントログから拡張オートスケールイベントを監視する
サーバレス コンピュートを使用しない DLT パイプラインの場合、パイプラインで拡張オートスケールが有効になっている場合、イベント ログはクラスターのサイズ変更をキャプチャします。 enhanced オートスケールに関する情報を含むイベントのイベントタイプは autoscale
です。 クラスター サイズ変更要求情報 は、 details:autoscale
オブジェクトに格納されます。 次の例では、拡張オートスケール クラスター サイズ変更要求に対して、最後のパイプライン更新のクエリを実行します。
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
コンピュートリソース利用状況のモニタリング
cluster_resources
イベントは、クラスター内のタスクスロットの数、それらのタスクスロットがどれだけ使用されているか、およびスケジュールを待っているタスクの数に関するメトリクスを提供します。
強化オートスケールが有効になっている場合、 cluster_resources
イベントには、 latest_requested_num_executors
や optimal_num_executors
などのオートスケール アルゴリズムのメトリクスも含まれます。 イベントは、アルゴリズムのステータスを CLUSTER_AT_DESIRED_SIZE
、 SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
、 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
などのさまざまな状態として示 も示します。
この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。
次の例では、最後のパイプライン更新のタスク キュー サイズ履歴を照会します。
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、最後のパイプライン更新の使用率履歴を照会します。
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、エグゼキューターのカウント履歴をクエリし、Enhanced オートスケール パイプラインでのみ使用可能なメトリクス (最新のリクエストでアルゴリズムによって要求されたエグゼキューターの数、最新のメトリクスに基づいてアルゴリズムによって推奨されるエグゼキューターの最適数、オートスケール アルゴリズムの状態など) を照会します。
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
DLT パイプラインの監査
DLT イベント ログ レコードやその他の Databricks 監査ログを使用して、DLT でデータがどのように更新されているかを完全に把握できます。
DLT は、パイプライン所有者の資格情報を使用して更新を実行します。使用する資格情報は、パイプラインの所有者を更新することで変更できます。 DLT は、パイプラインの作成、設定の編集、更新のトリガーなど、パイプラインに対するアクションについてユーザーを記録します。
Unity Catalog 監査イベントのリファレンスについては、「 Unity Catalog イベント 」を参照してください。
イベント ログでユーザー アクションをクエリする
イベントログを使用して、ユーザーアクションなどのイベントを監査できます。 ユーザー・アクションに関する情報を含むイベントのイベント・タイプは、 user_action
です。
アクションに関する情報は、details
フィールドの user_action
オブジェクトに格納されます。次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用する event_log_raw
ビューを作成するには、「 イベント ログのクエリ」を参照してください。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
|
|
|
---|---|---|
2021-05-20T19:36:03.517+0000 |
|
|
2021-05-20T19:35:59.913+0000 |
|
|
2021-05-27T00:35:51.971+0000 |
|
|
Runtime 情報
パイプライン更新プログラムのランタイム情報 (更新プログラムの Databricks Runtime バージョンなど) を表示できます。
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
|
---|
11.0 |