メインコンテンツまでスキップ

Monitor DLT パイプライン

この記事では、DLT パイプラインの組み込み モニタリング機能と可観測性機能の使用について説明します。 これらの機能は、次のようなタスクをサポートします。

クエリのパフォーマンスを検査および診断するには、「 DLT パイプラインのクエリ履歴にアクセスする」を参照してください。この機能はパブリック プレビュー段階です。

パイプラインイベントのEメール通知を追加する

1 つ以上の Eメール アドレスを設定して、次の場合に通知を受け取ることができます。

  • パイプラインの更新が正常に完了しました。
  • パイプラインの更新が失敗し、再試行可能または再試行不可能なエラーが発生します。 このオプションを選択すると、すべてのパイプラインの失敗に関する通知を受け取ります。
  • パイプラインの更新が再試行不可 (致命的) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生した場合にのみ通知を受け取ります。
  • 1 つのデータ フローが失敗します。

パイプライン を作成 または編集するときに Eメール 通知を設定するには:

  1. [ 通知の追加 ] をクリックします。
  2. 通知を受け取る 1 つ以上の Eメール アドレスを入力します。
  3. 各通知タイプのチェックボックスをクリックして、設定したEメールアドレスに送信します。
  4. [ 通知の追加 ] をクリックします。

UI ではどのようなパイプラインの詳細を使用できますか?

パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 デフォルトでは、パイプラインの詳細ページにはテーブルの最新の更新が表示されますが、ドロップダウンメニューから古い更新を選択することもできます。

詳細には、パイプライン ID、ソース コード、コンピュート コスト、製品エディション、およびパイプライン用に構成されたチャンネルが含まれます。

データセットの表形式ビューを表示するには、[ リスト ] タブをクリックします。 リスト ビューでは、パイプライン内のすべてのデータセットをテーブルの行として表示でき、パイプラインの DAG が大きすぎて グラフ ビューで視覚化できない場合に便利です。テーブルに表示されるデータセットは、データセット名、タイプ、ステータスなどの複数のフィルターを使用して制御できます。 DAG ビジュアライゼーションに戻すには、[ グラフ ] をクリックします。

実行者ユーザーは パイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。run as ユーザーを変更するには、[ アクセス許可 ] をクリックし、パイプラインの所有者を変更します。

データセットの詳細を表示するにはどうすればよいですか?

パイプライン グラフまたはデータセット リストでデータセットをクリックすると、データセットの詳細が表示されます。 詳細には、データセット スキーマ、データ品質メトリクス、データセットを定義するソースコードへのリンクが含まれます。

更新履歴の表示

パイプラインの更新の履歴とステータスを表示するには、上部のバーにある更新履歴のドロップダウンメニューをクリックします。

ドロップダウンメニューで更新を選択すると、更新のグラフ、詳細、イベントが表示されます。 最新の更新プログラムに戻すには、[ 最新の更新プログラムを表示 ] をクリックします。

ストリーミングメトリクスの表示

備考

プレビュー

DLT のストリーミング可観測性は パブリック プレビュー段階です。

DLT パイプラインの各ストリーミングフローについて、 Spark 構造化ストリーミングでサポートされているデータソース ( Apache Kafka、 Amazon Kinesis、 Auto Loaderテーブル、 Delta テーブルなど) からストリーミング メトリクスを表示できます。 メトリクスは、DLT UI の右ペインにグラフとして表示され、バックログ秒数、バックログバイト数、バックログレコード、バックログファイルが含まれます。 グラフには分単位で集計された最大値が表示され、グラフにカーソルを合わせるとツールチップに最大値が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。

DLT チャートのアイコンストリーミングメトリクスが利用可能なパイプライン内のテーブルでは、UI Graph ビューでパイプライン DAG を表示すると、 アイコンが表示されます。ストリーミング メトリクスを表示するには、 DLT チャートのアイコン をクリックして、右ペインの [フロー ] タブにストリーミング メトリクス チャートを表示します。 また、ストリーミング メトリクスのあるテーブルのみを表示するフィルターを適用するには、[ List ] をクリックし、[ Has ストリーミング メトリクス ] をクリックします。

各ストリーミング ソースは、特定のメトリクスのみをサポートします。 ストリーミングソースでサポートされていないメトリクスは、UIで表示できません。 次の表は、サポートされているストリーミング ソースで使用できるメトリクスを示しています。

ソース

バックログバイト

バックログ レコード

バックログ秒数

バックログ ファイル

Kafka

Kinesis

Delta

Auto Loader

Google Pub/Sub

DLT イベント ログとは何ですか?

DLT イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。 イベント ログを使用して、データ パイプラインの状態を追跡、理解、および監視できます。

イベント ログ エントリは、DLT ユーザー インターフェイス、DLT API で表示するか、イベント ログを直接クエリして表示できます。このセクションでは、イベント ログを直接クエリする方法に焦点を当てます。

また、イベントがログに記録されたときに実行するカスタムアクション (アラートの送信など) をイベント フックで定義することもできます。

important

イベント ログ、またはイベント ログが公開されている親カタログまたはスキーマは削除しないでください。 イベント ログを削除すると、今後の実行中にパイプラインの更新が失敗する可能性があります。

イベント ログ スキーマ

次の表では、イベント ログ スキーマについて説明します。 これらのフィールドの一部には、 details フィールドなど、一部のクエリを実行するために解析が必要な JSON データが含まれています。 Databricks では、JSON フィールドを解析するための : 演算子がサポートされています。 : (コロン記号) 演算子を参照してください。

フィールド

説明

id

イベント・ログ・レコードのユニークな識別子。

sequence

イベントを識別して順序付けするためのメタデータを含む JSON ドキュメント。

origin

イベントの起点のメタデータを含む JSON ドキュメント (クラウド プロバイダー、クラウド プロバイダーのリージョン、user_idpipeline_id、パイプラインが作成された場所を示す (DBSQL または WORKSPACE) pipeline_type

timestamp

イベントが記録された時刻。

message

イベントを説明する人間が判読できるメッセージ。

level

イベント タイプ ( INFOWARNERRORMETRICSなど)。

maturity_level

イベントスキーマの安定性。 可能な値は次のとおりです。 - STABLE: スキーマは安定しており、変更されません。 - NULL: スキーマは安定しており、変更されません。 maturity_level フィールドが追加される前にレコードが作成された場合 (リリース 2022.37) は、値が NULL になることがあります。 - EVOLVING: スキーマは安定しておらず、変更される可能性があります。 - DEPRECATED: スキーマは非推奨であり、DLT ランタイムはいつでもこのイベントの生成を停止できます。

error

エラーが発生した場合は、エラーを説明する詳細。

details

イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用される主要なフィールドです。

event_type

イベントの種類。

イベント ログのクエリ

注記

このセクションでは、 Unity Catalog とデフォルトの公開モードで構成されたパイプラインのイベント ログを操作するためのデフォルトの動作と構文について説明します。

デフォルトにより、DLT は、パイプライン用に構成されたデフォルト カタログとスキーマの非表示の Delta テーブルにイベント ログを書き込みます。 非表示になっている間も、十分に権限のあるすべてのユーザーがテーブルをクエリできます。 デフォルトでは、パイプラインの所有者のみがイベント ログ テーブルをクエリできます。

デフォルトでは、非表示のイベントログの名前は event_log_{pipeline_id}という形式で、パイプライン ID はシステムによって割り当てられた UUID で、ダッシュはアンダースコアに置き換えられています。

JSON 構成を操作して、イベント ログを公開できます。 イベント ログを発行するときは、イベント ログの名前を指定し、次の例のようにカタログとスキーマをオプションで指定できます。

JSON
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}

イベント ログの場所は、パイプライン内の Auto Loader クエリのスキーマの場所としても機能します。 Databricks 、権限を変更する前にイベント ログ テーブルに対してビューを作成することをお勧めします。これは、イベント ログ テーブルが直接共有されている場合、一部のコンピュート設定でユーザーがスキーマ メタデータにアクセスできる可能性があるためです。 次の構文例では、イベント ログ テーブルにビューを作成し、この記事に含まれるイベント ログ クエリの例で使用します。

SQL
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;

パイプライン実行の各インスタンスは 、更新 と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出したい場合があります。 次のクエリを実行して、最新の更新プログラムの識別子を検索し、 latest_update 一時ビューに保存します。 このビューは、この記事に含まれるイベント ログ クエリの例で使用されます。

SQL
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Unity Catalog では、ビューはストリーミング クエリをサポートします。 次の例では、構造化ストリーミングを使用して、イベント ログ テーブルの上に定義されたビューをクエリします。

Python
df = spark.readStream.table("event_log_raw")

パイプラインの所有者は、パイプライン構成の [詳細設定 ] セクションで Publish event log to metastore オプションを切り替えることで、イベント ログをパブリック Delta テーブルとして発行できます。オプションで、イベント・ログの新しいテーブル名、カタログ、およびスキーマを指定できます。

Query リネージ 情報をイベント ログから取得

リネージに関する情報を含むイベントのイベント・タイプは flow_definitionです。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_datasetinput_datasets が含まれています。

次のクエリを使用して、リネージ情報を表示するための入力データセットと出力データセットを抽出できます。

SQL
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id

output_dataset

input_datasets

customers

null

sales_orders_raw

null

sales_orders_cleaned

["customers", "sales_orders_raw"]

sales_order_in_la

["sales_orders_cleaned"]

イベント ログからのデータ品質のクエリ

パイプライン内のデータセットに対するエクスペクテーションを定義すると、データ品質メトリクスは details:flow_progress.data_quality.expectations オブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント・タイプは flow_progressです。 次の例では、最後のパイプライン更新のデータ品質メトリクスをクエリーします。

SQL
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name

dataset

expectation

passing_records

failing_records

sales_orders_cleaned

valid_order_number

4083

0

イベント ログからの Auto Loader イベントのクエリ

DLT は、 Auto Loader がファイルを処理するときにイベントを生成します。 Auto Loaderイベントの場合、event_typeoperation_progressで、details:operation_progress:typeAUTO_LOADER_LISTINGまたはAUTO_LOADER_BACKFILLです。details:operation_progress オブジェクトには、statusduration_msauto_loader_details:source_pathauto_loader_details:num_files_listed フィールドも含まれます。

次の例では、最新の更新の Auto Loader イベントをクエリします。

SQL
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')

イベント ログをクエリしてデータ バックログを監視する

DLT は、 details:flow_progress.metrics.backlog_bytes オブジェクトのバックログに存在するデータの量を追跡します。バックログメトリクスを含むイベントのイベントタイプは flow_progressです。 次の例では、最後のパイプライン更新のバックログ メトリクスをクエリします。

SQL
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
注記

backlog メトリクスは、パイプラインのデータソースタイプと Databricks Runtime バージョンによっては利用できない場合があります。

サーバレスが有効になっていないパイプラインのイベントログから拡張オートスケールイベントを監視する

サーバレス コンピュートを使用しない DLT パイプラインの場合、パイプラインで拡張オートスケールが有効になっている場合、イベント ログはクラスターのサイズ変更をキャプチャします。 enhanced オートスケールに関する情報を含むイベントのイベントタイプは autoscaleです。 クラスター サイズ変更要求情報 は、 details:autoscale オブジェクトに格納されます。 次の例では、拡張オートスケール クラスター サイズ変更要求に対して、最後のパイプライン更新のクエリを実行します。

SQL
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id

コンピュートリソース利用状況のモニタリング

cluster_resources イベントは、クラスター内のタスクスロットの数、それらのタスクスロットがどれだけ使用されているか、およびスケジュールを待っているタスクの数に関するメトリクスを提供します。

強化オートスケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsoptimal_num_executorsなどのオートスケール アルゴリズムのメトリクスも含まれます。 イベントは、アルゴリズムのステータスを CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATIONなどのさまざまな状態として示 も示します。 この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。

次の例では、最後のパイプライン更新のタスク キュー サイズ履歴を照会します。

SQL
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id

次の例では、最後のパイプライン更新の使用率履歴を照会します。

SQL
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id

次の例では、エグゼキューターのカウント履歴をクエリし、Enhanced オートスケール パイプラインでのみ使用可能なメトリクス (最新のリクエストでアルゴリズムによって要求されたエグゼキューターの数、最新のメトリクスに基づいてアルゴリズムによって推奨されるエグゼキューターの最適数、オートスケール アルゴリズムの状態など) を照会します。

SQL
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id

DLT パイプラインの監査

DLT イベント ログ レコードやその他の Databricks 監査ログを使用して、DLT でデータがどのように更新されているかを完全に把握できます。

DLT は、パイプライン所有者の資格情報を使用して更新を実行します。使用する資格情報は、パイプラインの所有者を更新することで変更できます。 DLT は、パイプラインの作成、設定の編集、更新のトリガーなど、パイプラインに対するアクションについてユーザーを記録します。

Unity Catalog 監査イベントのリファレンスについては、「 Unity Catalog イベント 」を参照してください。

イベント ログでユーザー アクションをクエリする

イベントログを使用して、ユーザーアクションなどのイベントを監査できます。 ユーザー・アクションに関する情報を含むイベントのイベント・タイプは、 user_actionです。

アクションに関する情報は、details フィールドの user_action オブジェクトに格納されます。次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用する event_log_raw ビューを作成するには、「 イベント ログのクエリ」を参照してください。

SQL
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

timestamp

action

user_name

2021-05-20T19:36:03.517+0000

START

user@company.com

2021-05-20T19:35:59.913+0000

CREATE

user@company.com

2021-05-27T00:35:51.971+0000

START

user@company.com

Runtime 情報

パイプライン更新プログラムのランタイム情報 (更新プログラムの Databricks Runtime バージョンなど) を表示できます。

SQL
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'

dbr_version

11.0