Delta Live Tables パイプラインの監視

この記事では、データリネージ、更新履歴、データ品質レポートなど、パイプラインの監視と監視のために Delta Live Tables の組み込み機能を使用する方法について説明します。

ほとんどのモニタリング データは、パイプラインの詳細 UI を使用して手動で確認できます。 一部のタスクは、イベント ログのメタデータを照会することで簡単に実行できます。 「 Delta Live Tables イベント ログとは」を参照してください。

UI ではどのようなパイプラインの詳細を使用できますか?

パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 デフォルトでは、パイプラインの詳細ページにはテーブルの最新の更新が表示されますが、ドロップダウンメニューから古い更新を選択できます。

表示される詳細には、パイプライン ID、ソース ライブラリ、コンピュート コスト、製品エディション、パイプラインに構成されたチャンネルが含まれます。

データセットを表形式で表示するには、[ リスト ] タブをクリックします。 リストビューでは、パイプライン内のすべてのデータセットをテーブルの行として表示でき、パイプラインの DAG が大きすぎてグラフビューで視覚化できない場合に便利です。テーブルに表示されるデータセットは、データセット名、タイプ、ステータスなどの複数のフィルターを使用して制御できます。 DAG ビジュアリゼーションに戻るには、[ グラフ] をクリックします。

[別のユーザーとして実行] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。run as ユーザーを変更するには、[アクセス許可] をクリックし、パイプラインの所有者を変更します。

データセットの詳細を表示するにはどうすればよいですか?

パイプライングラフまたはデータセットリストでデータセットをクリックすると、データセットの詳細が表示されます。 詳細には、データセットスキーマ、データ品質メトリクス、データセットを定義するソースコードへのリンクが含まれます。

更新履歴の表示

パイプラインの更新の履歴とステータスを表示するには、上部のバーにある更新履歴ドロップダウンメニューをクリックします。

更新プログラムのグラフ、詳細、およびイベントを表示するには、ドロップダウン メニューで更新プログラムを選択します。 最新の更新プログラムに戻るには、[ 最新の更新プログラムを表示] をクリックします。

パイプラインイベントの通知を取得する

パイプライン更新の正常な完了やパイプライン更新の失敗など、パイプライン イベントのリアルタイム通知を受信するには、パイプラインを作成または編集するときに、 パイプライン イベントの [Eメール アドレスの追加] 通知 を追加します。

Delta Live Tables イベントログとは何ですか?

Delta Live Tables イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。 イベント ログを使用して、データパイプラインの状態を追跡、理解、および監視できます。

イベント ログ エントリは、Delta Live Tables ユーザー インターフェイス、Delta Live Tables API、またはイベント ログを直接クエリして表示できます。 この記事では、イベント ログに対して直接クエリを実行する方法について説明します。

また、イベントがログに記録されたときに実行するカスタムアクション (アラートの送信など) を イベントフックとともに定義することもできます。

イベント ログ スキーマ

次の表では、イベント ログのスキーマについて説明します。 これらのフィールドの一部には、 details フィールドなど、クエリーを実行するために解析が必要な JSON データが含まれています。 Databricks では、JSON フィールドを解析するための : 演算子がサポートされています。 : (コロン記号) 演算子を参照してください。

説明

id

イベント ログ レコードの一意の識別子。

sequence

イベントを識別して順序付けるためのメタデータを含む JSON ドキュメント。

origin

イベントのオリジンのメタデータ (クラウドプロバイダー、クラウドプロバイダーリージョン、user_idpipeline_id、パイプラインが作成された場所 (DBSQLまたはWORKSPACE) を示すpipeline_typeなど) を含む JSON ドキュメント。

timestamp

イベントが記録された時刻。

message

イベントを説明する人間が判読できるメッセージ。

level

イベントの種類 (たとえば、 INFOWARNERRORMETRICSなど)。

error

エラーが発生した場合は、エラーを説明する詳細。

details

イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用されるプライマリ フィールドです。

event_type

イベントの種類。

maturity_level

イベント スキーマの安定性。 可能な値は次のとおりです。

  • STABLE: スキーマは安定しており、変更されません。

  • NULL: スキーマは安定しており、変更されません。 maturity_level 項目が追加される前にレコードが作成された場合、値は NULL になる可能性があります (リリース 2022.37)。

  • EVOLVING: スキーマは安定しておらず、変更される可能性があります。

  • DEPRECATED: スキーマは非推奨であり、Delta Live Tables ランタイムはいつでもこのイベントの生成を停止できます。

イベント ログ のクエリ

イベント ログの場所とイベント ログをクエリーするインターフェイスは、パイプラインが Hive metastore と Unity Catalogのどちらを使用するように構成されているかによって異なります。

Hive metastore

パイプライン がテーブルを Hive metastoreに発行する場合、イベント ログは storage の場所の下の /system/events に格納されます。たとえば、パイプライン storage 設定を /Users/username/dataとして構成した場合、イベント ログは DBFS の /Users/username/data/system/events パスに格納されます。

storage 設定を構成していない場合、既定のイベント ログの場所は DBFS で /pipelines/<pipeline-id>/system/events されます。たとえば、パイプラインの ID が 91de5e48-35ed-11ec-8d3d-0242ac130003の場合、ストレージの場所は /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/eventsです。

ビュー を作成すると、イベント ログのクエリを簡略化できます。 次の例では、 event_log_rawという一時ビューを作成します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

<event-log-path> をイベント ログの場所に置き換えます。

パイプライン実行の各インスタンスは、 更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリーを実行して、最新の更新プログラムの識別子を検索し、 latest_update_id 一時ビューに保存します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

イベント ログは、Databricks ノートブックまたは SQL エディターでクエリーできます。 ノートブックまたは SQL エディターを使用して、イベント ログの例クエリーを実行します。

Unity Catalog

パイプライン が Unity Catalogにテーブルを発行する場合は、 event_log テーブル値関数 (TVF) を使用してパイプラインのイベント ログをフェッチする必要があります。パイプラインのイベント ログを取得するには、パイプライン ID またはテーブル名を TVF に渡します。 たとえば、ID 04c78631-3dd7-4856-b2a6-7d84e9b2638bのパイプラインのイベント ログ レコードを取得するには、次のようにします。

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

テーブルを作成または所有しているパイプラインのイベントログレコードを取得するには my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

TVF を呼び出すには、共有クラスターまたは SQLウェアハウスを使用する必要があります。 たとえば、共有クラスターにアタッチされたノートブックを使用したり、SQLウェアハウスに接続された SQL エディター を使用したりできます。

パイプラインのイベントのクエリを簡略化するために、パイプラインの所有者は event_log TVF のビューを作成できます。 次の例では、パイプラインのイベント ログに対するビューを作成します。 このビューは、この記事に含まれているイベント ログのクエリーの例で使用されます。

event_log TVF はパイプライン所有者のみが呼び出すことができ、 event_log TVF 上に作成されたビューはパイプライン所有者のみがクエリーできます。ビューを他のユーザーと共有することはできません。

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

<pipeline-ID> を Delta Live Tables パイプラインの一意の識別子に置き換えます。 ID は、 Delta Live Tables UI の [パイプラインの詳細 ] パネルで確認できます。

パイプライン実行の各インスタンスは、 更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリーを実行して、最新の更新プログラムの識別子を検索し、 latest_update_id 一時ビューに保存します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

イベントログのリネージ情報のクエリ

系列に関する情報を含むイベントのイベント・タイプは flow_definitionです。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_datasetinput_datasets が含まれています。

次のクエリーを使用して入力データセットと出力データセットを抽出し、系列情報を表示できます。

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id

output_dataset

input_datasets

1

customers

null

2

sales_orders_raw

null

3

sales_orders_cleaned

["customers", "sales_orders_raw"]

4

sales_order_in_la

["sales_orders_cleaned"]

イベントログからのデータ品質のクエリー

パイプライン内のデータセットに対するエクスペクテーションを定義すると、データ品質メトリクスは details:flow_progress.data_quality.expectations オブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント・タイプは flow_progressです。 次の例では、最後のパイプライン更新のデータ品質メトリクスをクエリーします。

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name

dataset

expectation

passing_records

failing_records

1

sales_orders_cleaned

valid_order_number

4083

0

イベントログを照会してデータバックログを監視する

Delta Live Tables は、 details:flow_progress.metrics.backlog_bytes オブジェクトのバックログに存在するデータの量を追跡します。 バックログメトリクスを含むイベントのイベントタイプは flow_progressです。 次の例は、最後のパイプライン更新のクエリー バックログ メトリクスです。

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

バックログ メトリクスは、パイプラインの Databricks Runtime の種類とバージョンによっては使用できない場合があります。

拡張オートスケール イベントをイベント ログから監視する

イベント ログには、パイプラインで Enhanced オートスケールが有効になっている場合のクラスターのサイズ変更がキャプチャされます。 拡張オートスケールに関する情報を含むイベントのイベント・タイプは autoscaleです。 クラスターのサイズ変更要求情報は、 details:autoscale オブジェクトに格納されます。 次の例では、最後のパイプライン更新に対する拡張オートスケールクラスターのサイズ変更リクエストをクエリーします。

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

コンピュートのリソース使用率の監視

cluster_resources イベントは、クラスター内のタスクスロットの数、それらのタスクスロットの使用率、およびスケジュールを待機しているタスクの数に関するメトリクスを提供します。

拡張オートスケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsoptimal_num_executorsなどのオートスケール アルゴリズムのメトリクスも含まれます。 イベントは、アルゴリズムのステータスを CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATIONなどのさまざまな状態として示 も示します。 この情報は、オートスケール・イベントと併せて表示して、拡張オートスケールの全体像を把握することができます。

次の例では、最後のパイプライン更新のタスクキューサイズ履歴をクエリーします。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、最後のパイプライン更新の使用履歴をクエリーします。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、エグゼキューターのカウント履歴と、Enhanced オートスケール パイプラインでのみ使用可能なメトリクス (最新のリクエストでアルゴリズムによって要求されたエグゼキューターの数、最新のメトリクスに基づいてアルゴリズムによって推奨されるエグゼキューターの最適数、オートスケール アルゴリズムの状態など) をクエリーします。

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Delta Live Tables パイプラインの監査

Delta Live Tables イベント ログ レコードおよびその他の Databricks 監査ログを使用して、Delta Live テーブルでデータがどのように更新されているかを完全に把握できます。

Delta Live Tables は、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用する認証情報を変更できます。 Delta Live Tables は、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインでのアクションについてユーザーを記録します。

Unity Catalo g 監査イベントのリファレンスについては 、 Unity Catalo g イベント を参照してください

イベントログ内のユーザーアクションのクエリ

イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザーアクションに関する情報を含むイベントのイベントタイプは user_actionです。

アクションに関する情報は、 details フィールドの user_action オブジェクトに保存されます。次のクエリーを使用して、ユーザー イベントの監査ログを作成します。 このクエリーで使用する event_log_raw ビューを作成するには、「 イベント ログのクエリ」を参照してください。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

timestamp

action

user_name

1

2021-05-20T19:36:03.517+0000

START

user@company.com

2

2021-05-20T19:35:59.913+0000

CREATE

user@company.com

3

2021-05-27T00:35:51.971+0000

START

user@company.com

ランタイム情報

パイプライン更新のランタイム情報 (更新の Databricks Runtime バージョンなど) を表示できます。

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'

dbr_version

1

11.0