LakeFlow 宣言型パイプラインイベントログ
Lakeflow 宣言型パイプライン イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。イベント ログを使用して、データ パイプラインの状態を追跡、理解、および監視できます。
イベントログエントリは、LakeFlow Declarative パイプラインユーザーインターフェイス、LakeFlow Declarative パイプライン API、またはイベントログを直接照会して表示できます。このセクションでは、イベント ログを直接照会することに焦点を当てます。
また、イベントがログに記録されたときに実行するカスタムアクション(たとえば、アラートの送信など)を イベントフックを使用して定義することもできます。
イベント ログ、またはイベント ログが発行されている親カタログまたはスキーマは削除しないでください。イベントログを削除すると、今後の実行中にパイプラインの更新に失敗する可能性があります。
イベント ログ スキーマの詳細については、「宣言型パイプライン イベント ログ スキーマLakeFlow」を参照してください。
イベントログのクエリ
このセクションでは、 Unity Catalog とデフォルト公開モードで構成されたパイプラインのイベントログを操作するためのデフォルトの動作と構文について説明します。
- レガシ発行モードを使用する Unity Catalog パイプラインの動作については、「 Unity Catalog レガシ発行モード パイプラインのイベント ログの操作」を参照してください。
- Hive metastore パイプラインの動作と構文については、「 Hive metastore パイプラインのイベント ログの操作」を参照してください。
デフォルトによって、 Lakeflow 宣言型パイプラインは、パイプライン用に構成されたデフォルト カタログとスキーマの非表示の Delta テーブルにイベント ログを書き込みます。 非表示になっている間も、十分に権限のあるすべてのユーザーがテーブルをクエリできます。デフォルトでは、パイプラインの所有者のみがイベント ログ テーブルをクエリできます。
所有者としてイベント ログを照会するには、パイプライン ID を使用します。
SELECT * FROM event_log(<pipelineId>);
デフォルトでは、非表示のイベントログの名前は event_log_{pipeline_id}
としてフォーマットされ、パイプライン ID はシステム割り当ての UUID で、ダッシュはアンダースコアに置き換えられます。
パイプラインの [ 詳細設定 ] を編集し、 [イベント ログをメタストアに発行 ] を選択することで、イベント ログを発行できます。詳細については、「 イベントログのパイプライン設定」を参照してください。イベントログを公開するときは、次の例のように、イベントログの名前を指定し、オプションでカタログとスキーマを指定できます。
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
イベント ログの場所は、パイプライン内の Auto Loader クエリのスキーマの場所としても機能します。 Databricks イベントログテーブルが直接共有されている場合、一部のコンピュート設定では、ユーザーがスキーマメタデータにアクセスできる場合があるため、権限を変更する前にイベントログテーブルのビューを作成することをお勧めします。 次の構文例は、イベント ログ テーブルにビューを作成し、この記事に含まれるイベント ログ クエリの例で使用します。<catalog_name>.<schema_name>.<event_log_table_name>
をパイプライン イベント ログの完全修飾テーブル名に置き換えます。イベント ログを公開した場合は、公開時に指定した名前を使用します。それ以外の場合は、pipelineId がクエリを実行するパイプラインの ID である event_log(<pipelineId>)
を使用します。
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Unity Catalog では、ビューはストリーミング クエリをサポートします。次の例では、構造化ストリーミングを使用して、イベント ログ テーブルの上に定義されたビューを照会します。
df = spark.readStream.table("event_log_raw")
パイプラインの所有者は、パイプライン設定の [ 詳細設定 ] セクションの [Publish event log to metastore
] オプションを切り替えることで、イベント ログをパブリック Delta テーブルとして公開できます。オプションで、イベント・ログの新しいテーブル名、カタログおよびスキーマを指定できます。
基本的なクエリの例
次の例は、イベント ログにクエリを実行してパイプラインに関する一般的な情報を取得し、一般的なシナリオのデバッグを支援する方法を示しています。
以前の更新プログラムを照会してパイプラインの更新を監視する
次の例では、パイプラインの更新 (または 実行 ) を照会し、更新 ID、ステータス、開始時刻、完了時刻、および期間を表示します。これにより、パイプラインの実行の概要が表示されます。
「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
マテリアライズドビュー incremental 更新の問題をデバッグする
この例では、パイプラインの最新の更新からのすべてのフローを照会します。増分更新が行われたかどうか、および増分更新が行われない理由のデバッグに役立つその他の関連計画情報が表示されます。
「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
パイプライン更新のコストを照会する
この例では、パイプラインの DBU 使用状況と、特定のパイプライン実行のユーザーを照会する方法を示します。
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
高度なクエリ
次の例は、イベント ログをクエリして、あまり一般的ではないシナリオまたはより高度なシナリオを処理する方法を示しています。
パイプライン内のすべてのフローのメトリクスをクエリする
この例では、パイプライン内のすべてのフローに関する詳細情報を照会する方法を示します。フロー名、更新期間、データ品質 メトリクス、および処理された行に関する情報 (出力行、削除、更新挿入、および削除されたレコード)。
「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
クエリ データ品質または期待 メトリクス
パイプライン内のデータセットに期待値を定義すると、期待値に合格したレコード数と不合格レコード数のメトリクスが details:flow_progress.data_quality.expectations
オブジェクトに格納されます。ドロップされたレコード数のメトリクスは、 details:flow_progress.data_quality
オブジェクトに格納されます。 データ品質に関する情報を含むイベントのイベントタイプは flow_progress
です。
データ品質メトリクス 一部のデータセットでは使用できない場合があります。 期待値の制限を参照してください。
以下のデータ品質メトリクスを使用できます。
メトリクス | 説明 |
---|---|
| 1 つ以上のエクスペクテーション基準を満たなかったたために削除されたレコードの数。 |
| エクスペクテーション基準を満たしたレコードの数。 |
| エクスペクテーション基準を満たすことに失敗したレコードの数。 |
次の例では、最後のパイプライン更新のデータ品質メトリクスを照会します。これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
クエリリネージ 情報
リネージに関する情報を含むイベントのイベント・タイプは flow_definition
です。 details:flow_definition
オブジェクトには、グラフ内の各リレーションシップを定義する output_dataset
と input_datasets
が含まれています。
次のクエリを使用して、入力データセットと出力データセットを抽出し、リネージ情報を表示します。 これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Auto Loader によるクラウド ファイルの取り込みの監視
Lakeflow 宣言型パイプラインは、 Auto Loader がファイルを処理するときにイベントを生成します。 Auto Loaderイベントの場合、event_type
はoperation_progress
で、details:operation_progress:type
はAUTO_LOADER_LISTING
またはAUTO_LOADER_BACKFILL
のいずれかです。details:operation_progress
オブジェクトには、status
、 duration_ms
、 auto_loader_details:source_path
、 auto_loader_details:num_files_listed
フィールドも含まれます。
次の例では、最新の更新プログラムの Auto Loader イベントを照会します。 これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
ストリーミング期間を最適化するためのデータバックログの監視
LakeFlow 宣言型パイプラインは、 details:flow_progress.metrics.backlog_bytes
オブジェクトのバックログに存在するデータの量を追跡します。 バックログメトリクスを含むイベントのイベントタイプは flow_progress
です。 次の例では、バックログ メトリクス 最後のパイプライン更新を照会します。 これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
バックログ メトリクスは、パイプラインのデータソースタイプと Databricks Runtime バージョンによっては使用できない場合があります。
クラシックコンピュートを最適化するためのオートスケールイベントの監視
クラシック コンピュートを使用する (つまり、サーバレス コンピュートを使用しない) LakeFlow 宣言型パイプラインの場合、パイプラインで拡張オートスケールが有効になっている場合、イベントログはクラスタリングのサイズ変更をキャプチャします。 強化オートスケールに関する情報を含むイベントは、イベントタイプが autoscale
です。 クラスタリング サイズ変更要求情報は、 details:autoscale
オブジェクトに格納されます。
次の例では、拡張オートスケール クラスタリング サイズ変更要求に最後のパイプライン更新を照会します。 これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
コンピュート リソース utilization for classic コンピュートをモニターする
cluster_resources
イベントは、クラスタリング内のタスクスロットの数、それらのタスクスロットの使用量、およびスケジュールを待機しているタスクの数を示します。
強化オートスケールが有効になっている場合、 cluster_resources
イベントには、 latest_requested_num_executors
や optimal_num_executors
などのオートスケール アルゴリズムのメトリクスも含まれます。 イベントは、アルゴリズムのステータスを CLUSTER_AT_DESIRED_SIZE
、 SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
、 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
などのさまざまな状態として示 も示します。
この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。
次の例では、タスク キュー サイズ履歴、使用状況履歴、エグゼキューター カウント履歴、その他の メトリクス と 状態 を オートスケール 最後の更新でクエリします。 これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
パイプラインストリーミングメトリクスの監視
プレビュー
この機能は パブリック プレビュー段階です。
あなたは表示できます メトリクス LakeFlow Declarative パイプラインのストリームの進行状況について。 stream_progressイベントをクエリして、構造化ストリーミングによって作成されたStreamingQueryListenerメトリクスと非常によく似たイベントを取得しますが、次の例外があります。
- 次のメトリクスは
StreamingQueryListener
には存在しますが、stream_progress
には存在しません:numInputRows
、inputRowsPerSecond
、およびprocessedRowsPerSecond
。 - Kafka ストリームと Kineses ストリームの場合、
startOffset
、endOffset
、およびlatestOffset
フィールドが大きすぎる可能性があり、切り捨てられます。これらの各フィールドに対して、データが切り捨てられるかどうかのBoolean値とともに、startOffsetTruncated
、endOffsetTruncated
、および...Truncated``latestOffsetTruncated
フィールドが追加されます。
stream_progress
イベントを照会するには、次のような照会を使用できます。
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
JSON でのイベントの例を次に示します。
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
この例では、 ...Truncated
フィールドが false
に設定された Kafka ソース内の切り捨てられていないレコードを示しています。
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Lakeflow 宣言型パイプラインの監査
Lakeflow 宣言型パイプラインのイベント ログ レコードやその他の Databricks監査ログを使用して、宣言型パイプラインでデータがどのように更新されているかLakeflow全体像を把握できます。
Lakeflow 宣言型パイプラインは、パイプライン所有者の資格情報を使用して更新を実行します。 使用する資格情報は、パイプラインの所有者を更新することで変更できます。Lakeflow 宣言型パイプラインは、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインに対するアクションについてユーザーを記録します。
Unity Catalog 監査イベントのリファレンスについては、「 Unity Catalog イベント 」を参照してください。
イベントログでのユーザーアクションのクエリ
イベントログを使用して、ユーザーアクションなどのイベントを監査できます。ユーザーアクションに関する情報を含むイベントのイベントタイプは user_action
です。
アクションに関する情報は、 user_action
オブジェクトの details
フィールドに格納されます。次のクエリを使用して、ユーザー イベントの監査ログを作成します。これは、「イベント ログのクエリ」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
|
|
|
---|---|---|
2021-05-20T19:36:03.517+0000 |
|
|
2021-05-20T19:35:59.913+0000 |
|
|
2021-05-27T00:35:51.971+0000 |
|
|
Runtime 情報
パイプライン更新プログラムのランタイム情報 (たとえば、更新プログラムの Databricks Runtime バージョンなど) を表示できます。これは、「イベント ログのクエリを実行する」で説明されているように、関心のあるパイプラインのevent_log_raw
ビューを作成していることを前提としています。
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
|
---|
11.0 |