Delta Live Tables パイプラインの監視
この記事では、 Delta Live Tables パイプラインの組み込み モニタリング機能と可観測性機能の使用について説明します。 これらの機能は、次のようなタスクをサポートします。
パイプラインの更新の進行状況とステータスを監視します。 「UI で使用できるパイプラインの詳細」を参照してください。
パイプライン更新の成功や失敗などのパイプライン イベントに関するアラート。 「パイプラインイベントのEメール通知の追加」を参照してください。
Viewing メトリクス for ストリーミング ソース like Apache Kafka and Auto Loader (Public Preview). 「ストリーミングメトリクスの表示」を参照してください。
データリネージ、データ品質メトリクス、リソース使用量などのパイプライン更新に関する詳細な情報を抽出します。 「Delta Live Tables イベント ログとは」を参照してください。
特定のイベントが発生したときに実行するカスタムアクションの定義イベントフックを使用したDelta Live Tablesパイプラインのカスタムモニタリングの定義を参照してください。
クエリのパフォーマンスを検査および診断するには、「 Delta Live Tables パイプラインのクエリ履歴へのアクセス」を参照してください。 この機能はパブリック プレビュー段階です。
パイプラインイベントのEメール 通知を追加する
1 つ以上の Eメール アドレスを設定して、次の場合に通知を受け取ることができます。
パイプラインの更新が正常に完了しました。
パイプラインの更新が失敗し、再試行可能または再試行不可能なエラーが発生します。 このオプションを選択すると、すべてのパイプラインの失敗に関する通知を受け取ります。
パイプラインの更新が再試行不可 (致命的) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生した場合にのみ通知を受け取ります。
1 つのデータ フローが失敗します。
パイプライン を作成 または編集するときに Eメール 通知を設定するには:
[ 通知の追加] をクリックします。
通知を受け取る 1 つ以上の Eメール アドレスを入力します。
各通知タイプのチェックボックスをクリックして、設定したEメールアドレスに送信します。
[ 通知の追加] をクリックします。
UI ではどのようなパイプラインの詳細を使用できますか?
パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 デフォルトでは、パイプラインの詳細ページにはテーブルの最新の更新が表示されますが、ドロップダウンメニューから古い更新を選択することもできます。
詳細には、パイプライン ID、ソース コード、コンピュート コスト、製品エディション、およびパイプライン用に構成されたチャンネルが含まれます。
データセットを表形式で表示するには、[ リスト ] タブをクリックします。 リストビューでは、パイプライン内のすべてのデータセットをテーブルの行として表示でき、パイプラインの DAG が大きすぎてグラフビューで視覚化できない場合に便利です。テーブルに表示されるデータセットは、データセット名、タイプ、ステータスなどの複数のフィルターを使用して制御できます。 DAG ビジュアリゼーションに戻るには、[ グラフ] をクリックします。
[別のユーザーとして実行] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。run as
ユーザーを変更するには、[アクセス許可] をクリックし、パイプラインの所有者を変更します。
ストリーミングメトリクスの表示
プレビュー
Delta Live Tables のストリーミング監視は 、パブリック プレビュー段階です。
パイプライン内の各ストリーミングフローについて、Spark 構造化ストリーミングでサポートされているデータソースApacheKafka ( 、AmazonKinesis 、Auto Loader テーブル、Delta テーブルなど) からストリーミング メトリクスを表示できます。Delta Live Tablesメトリクスは、 Delta Live Tables UI の右ペインにグラフとして表示され、バックログ秒数、バックログバイト数、バックログレコード、バックログファイルが含まれます。 グラフには分単位で集計された最大値が表示され、グラフにカーソルを合わせるとツールチップに最大値が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。
ストリーミングメトリクスが利用可能なパイプライン内のテーブルでは、UI Graph ビューでパイプライン DAG を表示すると、 アイコンが表示されます。ストリーミング メトリクスを表示するには、 をクリックして、右ペインの [フロー ] タブにストリーミング メトリクス チャートを表示します。 また、ストリーミング メトリクスのあるテーブルのみを表示するフィルターを適用するには、[ List ] をクリックし、[ Has ストリーミング メトリクス] をクリックします。
各ストリーミング ソースは、特定のメトリクスのみをサポートします。 ストリーミングソースでサポートされていないメトリクスは、UIで表示できません。 次の表は、サポートされているストリーミング ソースで使用できるメトリクスを示しています。
ソース |
バックログバイト |
バックログ レコード |
バックログ秒数 |
バックログ ファイル |
---|---|---|---|---|
Kafka |
✓ |
✓ |
||
Kinesis |
✓ |
✓ |
||
Delta |
✓ |
✓ |
||
Auto Loader |
✓ |
✓ |
||
Google Pub/Sub |
✓ |
✓ |
Delta Live Tables イベントログとは何ですか?
Delta Live Tables イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。 イベント ログを使用して、データパイプラインの状態を追跡、理解、および監視できます。
イベント ログのエントリは、Delta Live Tables ユーザー インターフェイス、Delta Live Tables API で表示するか、イベント ログを直接クエリすることで表示できます。 このセクションでは、イベント ログを直接クエリする方法に焦点を当てます。
また、イベントがログに記録されたときに実行するカスタムアクション (アラートの送信など) を イベントフックとともに定義することもできます。
イベントログスキーマ
次の表では、イベント ログのスキーマについて説明します。 これらのフィールドの一部には、 details
フィールドなど、クエリーを実行するために解析が必要な JSON データが含まれています。 Databricks では、JSON フィールドを解析するための :
演算子がサポートされています。 : (コロン記号) 演算子を参照してください。
畑 |
説明 |
---|---|
|
イベント ログ レコードの一意の識別子。 |
|
イベントを識別して順序付けるためのメタデータを含む JSON ドキュメント。 |
|
イベントのオリジンのメタデータ (クラウドプロバイダー、クラウドプロバイダーリージョン、 |
|
イベントが記録された時刻。 |
|
イベントを説明する人間が判読できるメッセージ。 |
|
イベントの種類 (たとえば、 |
|
エラーが発生した場合は、エラーを説明する詳細。 |
|
イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用されるプライマリ フィールドです。 |
|
イベントの種類。 |
|
イベント スキーマの安定性。 可能な値は次のとおりです。
|
イベントログのクエリー
イベント ログの場所とイベント ログをクエリーするインターフェイスは、パイプラインが Hive metastore と Unity Catalogのどちらを使用するように構成されているかによって異なります。
Hive metastore
パイプラインが テーブルを Hive metastoreに発行する場合、イベント ログは storage
の場所の下の /system/events
に格納されます。たとえば、パイプライン storage
設定を /Users/username/data
として構成した場合、イベント ログは DBFS の /Users/username/data/system/events
パスに格納されます。
storage
設定を構成していない場合、既定のイベント ログの場所は DBFS で /pipelines/<pipeline-id>/system/events
されます。たとえば、パイプラインの ID が 91de5e48-35ed-11ec-8d3d-0242ac130003
の場合、ストレージの場所は /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
です。
ビュー を作成すると、イベント ログのクエリを簡略化できます。 次の例では、 event_log_raw
という一時ビューを作成します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。
CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;
<event-log-path>
をイベント ログの場所に置き換えます。
パイプライン実行の各インスタンスは、 更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリーを実行して、最新の更新プログラムの識別子を検索し、 latest_update_id
一時ビューに保存します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
イベント ログは、Databricks ノートブックまたは SQL エディターでクエリーできます。 ノートブックまたは SQL エディターを使用して、イベント ログの例クエリーを実行します。
Unity Catalog
パイプラインが Unity Catalogにテーブルを発行する場合は、 event_log
テーブル値関数 (TVF) を使用してパイプラインのイベント ログをフェッチする必要があります。パイプラインのイベント ログを取得するには、パイプライン ID またはテーブル名を TVF に渡します。 たとえば、ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b
のパイプラインのイベント ログ レコードを取得するには、次のようにします。
SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")
テーブルを作成または所有しているパイプラインのイベントログレコードを取得するには my_catalog.my_schema.table1
:
SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))
TVF を呼び出すには、共有クラスターまたは SQLウェアハウスを使用する必要があります。 たとえば、共有クラスターにアタッチされたノートブックを使用したり、SQLウェアハウスに接続された SQL エディター を使用したりできます。
パイプラインのイベントのクエリを簡略化するために、パイプラインの所有者は event_log
TVF のビューを作成できます。 次の例では、パイプラインのイベント ログに対するビューを作成します。 このビューは、この記事に含まれているイベント ログのクエリーの例で使用されます。
注
event_log
TVF はパイプライン所有者のみが呼び出すことができ、 event_log
TVF 上に作成されたビューはパイプライン所有者のみがクエリーできます。ビューを他のユーザーと共有することはできません。
CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");
<pipeline-ID>
を Delta Live Tables パイプラインの一意の識別子に置き換えます。 ID は、 Delta Live Tables UI の [パイプラインの詳細 ] パネルで確認できます。
パイプライン実行の各インスタンスは、 更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリーを実行して、最新の更新プログラムの識別子を検索し、 latest_update_id
一時ビューに保存します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
イベントログのリネージ情報のクエリ
リネージに関する情報を含むイベントのイベント・タイプは flow_definition
です。 details:flow_definition
オブジェクトには、グラフ内の各リレーションシップを定義する output_dataset
と input_datasets
が含まれています。
次のクエリーを使用して入力データセットと出力データセットを抽出し、系列情報を表示できます。
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
|
|
---|---|
|
|
|
|
|
|
|
|
イベントログからのデータ品質のクエリー
パイプライン内のデータセットに対するエクスペクテーションを定義すると、データ品質メトリクスは details:flow_progress.data_quality.expectations
オブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント・タイプは flow_progress
です。 次の例では、最後のパイプライン更新のデータ品質メトリクスをクエリーします。
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
|
|
|
|
---|---|---|---|
|
|
4083 |
0 |
イベントログを照会してデータバックログを監視する
Delta Live Tables は、 details:flow_progress.metrics.backlog_bytes
オブジェクトのバックログに存在するデータの量を追跡します。 バックログメトリクスを含むイベントのイベントタイプは flow_progress
です。 次の例は、最後のパイプライン更新のクエリー バックログ メトリクスです。
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
注
バックログ メトリクスは、パイプラインの Databricks Runtime の種類とバージョンによっては使用できない場合があります。
サーバレスが有効になっていないパイプラインのイベントログから拡張オートスケールイベントを監視する
サーバレス コンピュートを使用しない DLT パイプラインの場合、パイプラインで拡張オートスケールが有効になっていると、イベント ログにクラスターのサイズ変更が記録されます。 enhanced オートスケールに関する情報を含むイベントのイベントタイプは autoscale
です。 クラスターのサイズ変更要求情報は、 details:autoscale
オブジェクトに格納されます。 次の例では、拡張オートスケール クラスターのサイズ変更要求に対して、最後のパイプライン更新を照会します。
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
コンピュートのリソース使用率の監視
cluster_resources
イベントは、クラスター内のタスクスロットの数、それらのタスクスロットの使用率、およびスケジュールを待機しているタスクの数に関するメトリクスを提供します。
拡張オートスケールが有効になっている場合、 cluster_resources
イベントには、 latest_requested_num_executors
や optimal_num_executors
などのオートスケール アルゴリズムのメトリクスも含まれます。 また、イベントでは、アルゴリズムのステータスが CLUSTER_AT_DESIRED_SIZE
、 SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
、 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
などのさまざまな状態として示されます。 この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。
次の例では、最後のパイプライン更新のタスクキューサイズ履歴をクエリーします。
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、最後のパイプライン更新の使用履歴をクエリーします。
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、エグゼキューターのカウント履歴をクエリし、Enhanced オートスケール パイプラインでのみ使用可能なメトリクス (最新のリクエストでアルゴリズムによって要求されたエグゼキューターの数、最新のメトリクスに基づいてアルゴリズムによって推奨されるエグゼキューターの最適数、オートスケール アルゴリズムの状態など) を照会します。
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Delta Live Tables パイプラインの監査
Delta Live Tables イベント ログ レコードおよびその他の Databricks 監査ログを使用して、Delta Live テーブルでデータがどのように更新されているかを完全に把握できます。
Delta Live Tables は、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用する認証情報を変更できます。 Delta Live Tables は、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインでのアクションについてユーザーを記録します。
Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog イベント」を参照してください。
イベントログ内のユーザーアクションのクエリ
イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザーアクションに関する情報を含むイベントのイベントタイプは user_action
です。
アクションに関する情報は、 details
フィールドの user_action
オブジェクトに保存されます。次のクエリーを使用して、ユーザー イベントの監査ログを作成します。 このクエリーで使用する event_log_raw
ビューを作成するには、「 イベント ログのクエリ」を参照してください。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
|
|
|
---|---|---|
2021-05-20T19:36:03.517+0000 |
|
|
2021-05-20T19:35:59.913+0000 |
|
|
2021-05-27T00:35:51.971+0000 |
|
|