LakeFlow宣言型パイプラインイベントログ
Lakeflow 宣言型パイプライン イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。イベント ログを使用して、データ パイプラインの状態を追跡、理解、および監視できます。
イベント ログ エントリは、 LakeFlow宣言型パイプラインユーザー インターフェイス、 LakeFlow宣言型パイプラインAPIで、またはイベント ログを直接クエリすることで表示できます。 このセクションでは、イベント ログを直接クエリすることに焦点を当てます。
イベントフックを使用して、アラートの送信など、イベントがログに記録されたときに実行するカスタム アクションを定義することもできます。
イベント ログ、またはイベント ログが公開されている親カタログやスキーマを削除しないでください。イベント ログを削除すると、将来の実行時にパイプラインの更新が失敗する可能性があります。
イベント ログ スキーマの詳細については、 LakeFlow宣言型パイプライン イベント ログ スキーマ」を参照してください。
イベントログを照会する
このセクションでは、Unity Catalog とデフォルトの公開モードで構成されたパイプラインのイベント ログを操作するためのデフォルトの動作と構文について説明します。
- 従来の公開モードを使用する Unity Catalog パイプラインの動作については、 「Unity Catalog 従来の公開モード パイプラインのイベント ログを操作する」を参照してください。
- Hive metastoreパイプラインの動作と構文については、 Hive metastoreパイプラインのイベント ログの操作」を参照してください。
デフォルトによって、 Lakeflow 宣言型パイプラインは、パイプライン用に構成されたデフォルト カタログとスキーマの非表示の Delta テーブルにイベント ログを書き込みます。 非表示になっている間も、十分に権限のあるすべてのユーザーがテーブルをクエリできます。デフォルトでは、パイプラインの所有者のみがイベント ログ テーブルをクエリできます。
所有者としてイベント ログをクエリするには、パイプライン ID を使用します。
SELECT * FROM event_log(<pipelineId>);
デフォルトでは、非表示のイベント ログの名前はevent_log_{pipeline_id}という形式になります。パイプライン ID は、ダッシュがアンダースコアに置き換えられた、システムによって割り当てられた UUID です。
パイプラインの 詳細設定 を編集することで、イベント ログを公開できます。詳細については、イベント ログのパイプライン設定を参照してください。イベント ログを公開するときは、次の例のように、イベント ログの名前を指定し、オプションでカタログとスキーマを指定します。
{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}
イベント ログの場所は、パイプライン内のAuto Loaderクエリのスキーマの場所としても機能します。 Databricks 、権限を変更する前にイベント ログ テーブルのビューを作成することをお勧めします。これは、イベント ログ テーブルが直接共有されている場合、一部のコンピュート設定によりユーザーがスキーマ メタデータにアクセスできるようになる可能性があるためです。 次の例の構文は、イベント ログ テーブルにビューを作成し、この記事に含まれるイベント ログ クエリの例で使用されます。<catalog_name>.<schema_name>.<event_log_table_name>パイプライン イベント ログの完全修飾テーブル名に置き換えます。イベント ログを公開した場合は、公開時に指定した名前を使用します。それ以外の場合は、 event_log(<pipelineId>)を使用します。ここで、pipelineId はクエリするパイプラインの ID です。
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Unity Catalog では、ビューはストリーミング クエリをサポートします。次の例では、構造化ストリーミングを使用して、イベント ログ テーブルの上に定義されたビューをクエリします。
df = spark.readStream.table("event_log_raw")
基本的なクエリの例
次の例は、イベント ログをクエリしてパイプラインに関する一般的な情報を取得し、一般的なシナリオのデバッグに役立てる方法を示しています。
以前の更新をクエリしてパイプラインの更新を監視する
次の例では、パイプラインの更新 (または 実行 ) をクエリし、更新 ID、ステータス、開始時刻、完了時刻、および期間を表示します。これにより、パイプラインの実行の概要が表示されます。
「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであると想定します。
with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
マテリアライズドビューの増分更新の問題をデバッグする
この例では、パイプラインの最新の更新からすべてのフローをクエリします。増分更新されたかどうか、また増分更新が行われない理由をデバッグするのに役立つその他の関連計画情報も表示されます。
「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであると想定します。
WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
パイプライン更新のコストを照会する
この例では、パイプラインの DBU 使用量と、特定のパイプラインの実行のユーザーを照会する方法を示します。
SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;
高度なクエリ
次の例は、あまり一般的ではないシナリオやより高度なシナリオを処理するためにイベント ログをクエリする方法を示しています。
パイプライン内のすべてのフローのメトリクスをクエリする
この例では、パイプライン内のすべてのフローの詳細情報を照会する方法を示します。ここには、フロー名、更新期間、データ品質メトリクス、および処理された行 (出力行、削除、更新/挿入、およびドロップされたレコード) に関する情報が表示されます。
「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであると想定します。
WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations
  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,
  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
クエリデータの品質または期待 メトリクス
パイプライン内のデータ セットに対する期待値を定義すると、期待値に合格したレコード数と不合格となったレコード数のメトリクスがdetails:flow_progress.data_quality.expectationsオブジェクトに保存されます。 ドロップされたレコード数のメトリクスは、 details:flow_progress.data_qualityオブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント タイプはflow_progressです。
データ品質メトリクスは、一部のデータ セットでは利用できない場合があります。 期待値の制限を参照してください。
次のデータ品質メトリクスが利用可能です。
| メトリクス | 説明 | 
|---|---|
| 
 | 1 つ以上のエクスペクテーション基準を満たなかったたために削除されたレコードの数。 | 
| 
 | エクスペクテーション基準を満たしたレコードの数。 | 
| 
 | エクスペクテーション基準を満たすことに失敗したレコードの数。 | 
次の例では、最後のパイプライン更新についてデータ品質メトリクスをクエリします。 これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;
リネージ情報を問い合わせる
リネージに関する情報を含むイベントのイベント・タイプは flow_definitionです。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_dataset と input_datasets が含まれています。
次のクエリを使用して入力データ セットと出力データ セットを抽出し、リネージ情報を表示します。 これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Auto Loader でクラウド ファイルの取り込みを監視する
Lakeflow 宣言型パイプラインは、 Auto Loader がファイルを処理するときにイベントを生成します。 Auto Loaderイベントの場合、event_typeはoperation_progressで、details:operation_progress:typeはAUTO_LOADER_LISTINGまたはAUTO_LOADER_BACKFILLのいずれかです。details:operation_progress オブジェクトには、status、 duration_ms、 auto_loader_details:source_path、 auto_loader_details:num_files_listed フィールドも含まれます。
次の例では、最新の更新プログラムの Auto Loader イベントを照会します。これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
ストリーミング期間を最適化するためにデータバックログを監視する
LakeFlow宣言型パイプラインは、 details:flow_progress.metrics.backlog_bytesオブジェクトのバックログに存在するデータの量を追跡します。 バックログ メトリクスを含むイベントのイベント タイプはflow_progressです。 次の例では、バックログ メトリクスにパイプラインの最後の更新をクエリします。 これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;
パイプラインのデータ ソース タイプとDatabricks Runtimeバージョンによっては、バックログ メトリクスを利用できない場合があります。
オートスケール イベントを監視してクラシック コンピュートを最適化する
クラシック コンピュートを使用する (つまり、サーバレス コンピュートを使用しない) LakeFlow宣言型パイプラインの場合、パイプラインで拡張オートスケールが有効になっていると、イベント ログ キャプチャ クラスターのサイズが変更されます。 拡張オートスケールに関する情報を含むイベントのイベント タイプはautoscaleです。 クラスターのサイズ変更要求情報はdetails:autoscaleオブジェクトに保存されます。
次の例では、最後のパイプライン更新に対する拡張オートスケール クラスター サイズ変更リクエストをクエリします。 これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id
クラシック コンピュートのコンピュート リソースの使用状況を監視する
cluster_resources イベントは、クラスター内のタスク スロットの数、それらのタスク スロットがどれだけ使用されているか、スケジュールされるのを待っているタスクの数に関するメトリクスを提供します。
強化オートスケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsや optimal_num_executorsなどのオートスケール アルゴリズムのメトリクスも含まれます。 イベントは、アルゴリズムのステータスを CLUSTER_AT_DESIRED_SIZE、 SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS、 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATIONなどのさまざまな状態として示 も示します。
この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。
次の例では、タスク キュー サイズ履歴、使用率履歴、エグゼキューター カウント履歴、および最後のパイプライン更新におけるオートスケールのその他のメトリクスと状態をクエリします。 これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;
パイプラインストリーミングメトリクスを監視する
プレビュー
この機能は パブリック プレビュー段階です。
LakeFlow宣言型パイプラインのストリームの進捗状況についてメトリクスを表示できます。 stream_progressイベントをクエリして、構造化ストリーミングによって作成されたStreamingQueryListener メトリクスによく似たイベントを取得しますが、次の例外があります。
- 次のメトリクスは、 StreamingQueryListenerには存在しますが、stream_progressには存在しません:numInputRows、inputRowsPerSecond、およびprocessedRowsPerSecond。
- Kafka および Kineses ストリームの場合、 startOffset、endOffset、およびlatestOffsetフィールドが大きすぎる可能性があるため、切り捨てられます。これらのフィールドごとに、データが切り捨てられるかどうかを示すBoolean値を持つ追加の...Truncatedフィールド、startOffsetTruncated、endOffsetTruncated、およびlatestOffsetTruncatedが追加されます。
stream_progressイベントをクエリするには、次のようなクエリを使用できます。
SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
以下は JSON でのイベントの例です。
{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}
この例では、 ...Truncatedフィールドがfalseに設定された、Kafka ソース内の切り捨てられていないレコードを示します。
{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}
Lakeflow 宣言型パイプラインの監査
Lakeflow 宣言型パイプラインのイベント ログ レコードやその他の Databricks監査ログを使用して、宣言型パイプラインでデータがどのように更新されているかLakeflow全体像を把握できます。
Lakeflow 宣言型パイプラインは、パイプライン所有者の資格情報を使用して更新を実行します。 使用する資格情報は、パイプラインの所有者を更新することで変更できます。Lakeflow 宣言型パイプラインは、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインに対するアクションについてユーザーを記録します。
Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog イベント」を参照してください。
イベントログでユーザーアクションを照会する
イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。ユーザーアクションに関する情報を含むイベントのイベントタイプはuser_actionです。
アクションに関する情報は、 detailsフィールドのuser_actionオブジェクトに保存されます。次のクエリを使用して、ユーザー イベントの監査ログを作成します。これは、「イベント ログのクエリ」で説明されているように、対象のパイプラインのevent_log_rawビューを作成済みであることを前提としています。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
| 
 | 
 | 
 | 
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | 
 | 
 | 
| 2021-05-20T19:35:59.913+0000 | 
 | 
 | 
| 2021-05-27T00:35:51.971+0000 | 
 | 
 | 
Runtime情報
パイプライン更新のランタイム情報 (更新の Databricks Runtime バージョンなど) を表示できます。これは、 「イベント ログのクエリ」で説明されているように、目的のパイプラインのevent_log_rawビューを作成していることを前提としています。
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
| 
 | 
|---|
| 11.0 |