メインコンテンツまでスキップ

テーブル履歴を取り扱う

Apache Iceberg と Delta Lake テーブルの場合、テーブルを変更する各オペレーションは新しいテーブルバージョンを作成します。履歴情報を使用して、オペレーションを監査したり、テーブルをロールバックしたり、タイムトラベルを使用して特定の時点のテーブルを照会したりすることができます。

注記

Databricksでは、テーブル履歴をデータアーカイブの長期バックアップソリューションとして使用することはお勧めしません。データとログの保持構成の両方をより大きな値に設定していない限り、タイムトラベルオペレーションには過去7日間のみを使用してください。

テーブル履歴の取得

テーブルへの各書き込みのオペレーション、ユーザー、タイムスタンプなどの情報を取得するには、DESCRIBE HISTORY コマンドを実行します。オペレーションは、時系列の逆順で返されます。

テーブル履歴の保持期間はテーブル設定logRetentionDurationによって決まります。デフォルトでは30日間です。

注記

タイムトラベルとテーブル履歴は、異なる保持しきい値によって制御されます。See タイムトラベル.

SQL
DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only

Spark SQL構文の詳細については、DESCRIBE HISTORYを参照してください。

Scala、Java、Python の構文の詳細については、Delta Lake API ドキュメントを参照してください。

カタログ エクスプローラーには、 「履歴」 タブでテーブルの履歴が視覚的に表示されます。

履歴スキーマ

historyオペレーションの出力には、次の列があります。

Type

説明

version

long

オペレーションによって生成されたテーブルのバージョン。

timestamp

timestamp

このバージョンがコミットされたとき。

userId

string

オペレーションを実行したユーザーのID。

userName

string

操作を実行したユーザーの名前。

operation

string

オペレーションの名前です。

operationParameters

map

操作のパラメーター(たとえば、述語)。

ジョブ

struct

その操作を実行したLakeFlow Jobの詳細。LakeFlow Job から書き込まれたコミットにのみ反映されます。それ以外の場合は、nullです。

notebook

struct

オペレーションが実行されたDatabricksノートブックの詳細です。Databricks ノートブックから書き込まれたコミットにのみ入力されます。それ以外の場合は、nullです。

clusterId

string

オペレーションが実行されたクラスターのIDです。

readVersion

long

書き込み操作を実行するために読み込まれたテーブルのバージョン。

isolationLevel

string

この操作に使用される独立性レベル。

isBlindAppend

boolean

このオペレーションでデータが追加されたかどうか。

operationMetrics

map

操作のメトリクス(たとえば、変更された行とファイルの数)。

userMetadata

string

ユーザー定義のコミットメタデータ (指定されている場合)。

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
注記

運用パラメーターにおける の理解partitionBy

テーブル履歴のpartitionByフィールドは、テーブルのパーティションスキーマを定義または変更するCREATEおよびOVERWRITE操作にのみ意味があります。

既存のテーブルへの追加操作 (APPEND、INSERT、UPDATE、DELETE、MERGE) の場合、このフィールドには、使用される書き込み方法 (.save().saveAsTable()) に応じて、空の配列 [] またはパーティション列が表示される場合があります。

この不整合は予期された動作であり、データがパーティションに書き込まれる方法には影響しません。追加操作の検証には使用しないでください。

date 列でパーティション分割されたテーブルについて考えます。テーブルを作成すると、partitionBy が入力されます。

Python
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")

履歴におけるCREATE操作は次のとおりです:

operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}

このテーブルにデータを追加すると、partitionByは空の配列を表示します。

Python
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")

追加オペレーションは以下を示します。

operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}

partitionBy の空の値が必要です。データは、テーブルの既存のパーティションスキーマに基づいて、引き続き正しいパーティションに書き込まれます。パスへの.save()がこのフィールドにパーティション列を表示する場合がありますが、この違いは実装の詳細であり、書き込み動作には影響しません。

運用メトリクス

historyオペレーションはoperationMetrics、列マップのオペレーションメトリクスのコレクションを返します。

以下のテーブルは、オペレーション別のマップキー定義の一覧です。

WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO

これらのオペレーションには、以下のメトリクスが利用可能です。

メトリクス名

説明

numFiles

書き込まれたファイルの数。

numOutputBytes

書き込まれた内容のサイズ(バイト単位)。

numOutputRows

書き込まれた行の数。

STREAMING UPDATE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numAddedFiles

追加されたファイルの数。

numRemovedFiles

削除されたファイルの数。

numOutputRows

書き込まれた行の数。

numOutputBytes

書き込みサイズ(バイト単位)。

DELETE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numAddedFiles

追加されたファイルの数。テーブルのパーティションが削除された場合は提供されません。

numRemovedFiles

削除されたファイルの数。

numDeletedRows

削除された行の数です。テーブルのパーティションが削除された場合は提供されません。

numCopiedRows

ファイルの削除処理中にコピーされた行数。

executionTimeMs

オペレーション全体の実行にかかった時間。

scanTimeMs

ファイルの一致をスキャンするのにかかった時間。

rewriteTimeMs

一致したファイルの書き換えにかかった時間です。

TRUNCATE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numRemovedFiles

削除されたファイルの数。

executionTimeMs

オペレーション全体の実行にかかった時間。

MERGE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numSourceRows

ソースDataFrameの行数。

numTargetRowsInserted

ターゲットテーブルに挿入された行数。

numTargetRowsUpdated

ターゲットテーブルで更新された行数。

numTargetRowsDeleted

ターゲットテーブルで削除された行数。

numTargetRowsCopied

コピーされたターゲット行の数。

numOutputRows

書き出された行の合計数。

numTargetFilesAdded

シンク(ターゲット)に追加されたファイルの数。

numTargetFilesRemoved

シンク(ターゲット)から削除されたファイルの数。

executionTimeMs

オペレーション全体の実行にかかった時間。

scanTimeMs

ファイルの一致をスキャンするのにかかった時間。

rewriteTimeMs

一致したファイルの書き換えにかかった時間です。

UPDATE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numAddedFiles

追加されたファイルの数。

numRemovedFiles

削除されたファイルの数。

numUpdatedRows

更新された行数。

numCopiedRows

ファイルの更新プロセスでコピーされた行数。

executionTimeMs

オペレーション全体の実行にかかった時間。

scanTimeMs

ファイルの一致をスキャンするのにかかった時間。

rewriteTimeMs

一致したファイルの書き換えにかかった時間です。

FSCK

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numRemovedFiles

削除されたファイルの数。

CONVERT

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numConvertedFiles

変換されたParquetファイルの数。

OPTIMIZE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numAddedFiles

追加されたファイルの数。

numRemovedFiles

最適化されたファイルの数。

numAddedBytes

テーブルが最適化された後に追加されたバイト数。

numRemovedBytes

削除されたバイト数。

minFileSize

テーブルが最適化された後の最小ファイルのサイズ。

p25FileSize

テーブルが最適化された後の25パーセンタイルファイルのサイズ。

p50FileSize

テーブルが最適化された後のファイルサイズの中央値。

p75FileSize

テーブルが最適化された後の75パーセンタイルファイルのサイズ。

maxFileSize

テーブルが最適化された後の最大ファイルのサイズ。

CLONE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

sourceTableSize

クローン作成されたバージョンでのソーステーブルのサイズ(バイト単位)。

sourceNumOfFiles

クローン作成されたバージョンのソーステーブル内のファイルの数。

numRemovedFiles

以前のテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの数。

removedFilesSize

以前のテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの合計サイズ(バイト単位)。

numCopiedFiles

新しい場所にコピーされたファイルの数。シャロークローンの場合は0。

copiedFilesSize

新しい場所にコピーされたファイルの合計サイズ(バイト単位)。シャロークローンの場合は0。

RESTORE

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

tableSizeAfterRestore

復元後のテーブルサイズ(バイト単位)。

numOfFilesAfterRestore

復元後のテーブル内のファイルの数

numRemovedFiles

復元オペレーションによって削除されたファイルの数。

numRestoredFiles

復元の結果として追加されたファイルの数。

removedFilesSize

リストアによって削除されたファイルのサイズ(バイト単位)。

restoredFilesSize

リストアによって追加されたファイルのサイズ(バイト単位)。

VACUUM

このオペレーションでは、次のメトリクスが利用可能です:

メトリクス名

説明

numDeletedFiles

削除されたファイルの数。

numVacuumedDirectories

vacuum処理されたディレクトリの数。

numFilesToDelete

削除するファイルの数。

タイムトラベル

タイムトラベルは、タイムスタンプまたはテーブルバージョン(トランザクションログに記録されている)に基づいた以前のテーブルバージョンのクエリーをサポートしています。タイムトラベルは、次のようなアプリケーションに使用できます:

  • 分析、レポート、または出力(機械学習モデルの出力など)を再作成します。これは、特に規制された業界でのデバッグや監査に役立つ可能性があります。
  • 複雑なテンポラルクエリーを記述する。
  • データの誤りを修正する。
  • 急速に変化するテーブルの一連のクエリーに対してスナップショット分離を提供します。
注記

Databricks Runtime 18.0 以降では、 deletedFileRetentionDurationテーブル プロパティ (デフォルトは 7 日) よりも古いバージョンをリクエストした場合、タイムトラベル クエリはブロックされます。 Unity Catalogマネージドテーブルの場合、これはDatabricks Runtime 12.2 以降に適用されます。

タイムトラベル構文

タイムトラベルを使用してテーブルをクエリするには、テーブル名の指定の後に句を追加します。

  • timestamp_expression 次のいずれかになります:

    • '2018-10-18T22:15:12.013Z'つまり、タイムスタンプにキャストできる文字列です
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'、つまり日付文字列です
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • タイムスタンプにキャストされる、またはタイムスタンプにキャストできるその他の式
  • version は、DESCRIBE HISTORY table_specの出力から取得できる長い値です。

timestamp_expressionversionもサブクエリーにすることはできません。

日付またはタイムスタンプ文字列のみが受け入れられます。たとえば、"2019-01-01""2019-01-01T00:00:00.000Z"です。シンタックスの例については、以下のコードを参照してください:

SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

@構文を使用して、タイムスタンプまたはバージョンをテーブル名の一部として指定することもできます。タイムスタンプはyyyyMMddHHmmssSSS形式である必要があります。@vを使用してバージョンを指定できます。構文の例については、以下のコードを参照してください:

SQL
-- Timestamp version
SELECT * FROM people10m@20190101000000000
-- Version number
SELECT * FROM people10m@v123

タイムトラベルクエリーのデータ保持を構成する

以前のテーブルバージョンをクエリするには、そのバージョンのログファイルとデータファイルの*両方*を保持する必要があります。

  • VACUUMテーブルに対して実行されると、データファイルが削除されます。
  • テーブルバージョンのチェックポイント設定後、ログファイルは自動的に削除されます。

テーブルのデータ保持しきい値を増やすには、<format>delta または iceberg のいずれかに置き換えて、次のテーブル プロパティを構成する必要があります。

  • <format>.logRetentionDuration = "interval <interval>":テーブルの履歴を保持する期間を制御します。デフォルトはinterval 30 daysです。

    • Databricks Runtime 18.0以降では、logRetentionDurationdeletedFileRetentionDuration以上である必要があります。Unity Catalogマネージドテーブルの場合、これはDatabricks Runtime 12.2 以降に適用されます。
  • <format>.deletedFileRetentionDuration = "interval <interval>":現在のテーブルバージョンで参照されなくなったデータファイルを削除するためにVACUUMが使用するしきい値を決定します。デフォルトはinterval 7 daysです。

例えば、30日間のヒストリカルデータにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days"を設定します。これは、delta.logRetentionDurationのデフォルト設定と一致します。

重要

データ保持のしきい値を増やすと、より多くのデータファイルが保持されるため、ストレージコストが増加する可能性があります。

テーブルプロパティは、テーブル作成時に指定するか、ALTER TABLEステートメントで設定することができます。「テーブルプロパティリファレンス」を参照してください。

タイムトラベルの例

ユーザー 111 によるテーブルへの誤った削除を修正するには:

SQL
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111

テーブルへの偶発的な誤った更新を修正するには:

SQL
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *

先週追加された新規顧客の数をクエリするには:

SQL
SELECT
(
SELECT count(distinct userId)
FROM my_table
)
-
(
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
) AS new_customers

トランザクションログチェックポイント

トランザクションログは、テーブルデータとともにトランザクションログディレクトリ内のJSONファイルとしてテーブルバージョンを記録します。

チェックポイントクエリを最適化するために、テーブルバージョンはParquetチェックポイントファイルに集約されます。これにより、テーブル履歴のすべてのJSONバージョンを読み取る必要がなくなり、パフォーマンスが向上します。ユーザーはチェックポイントと直接やり取りする必要はありません。

Databricksは、データサイズとワークロードに応じてチェックポイントの頻度を最適化します。チェックポイントの頻度は予告なく変更される場合があります。

テーブルを以前の状態に復元する

次のシナリオを含め、RESTOREコマンドを使用してテーブルを以前のバージョンまたはタイムスタンプに復元します。

  • すでにリストアされたテーブルをリストアすることができます。
  • クローンテーブルを復元できます。

次の要件を考慮してください:

  • テーブルを復元するには、テーブルに対するMODIFY権限が必要です。
  • データファイルが手動で、またはVACUUMによって削除された後、それらのファイルを参照する古いバージョンにテーブルを復元することはできません。spark.sql.files.ignoreMissingFilestrueに設定されている場合、このバージョンへの部分的な復元は依然として可能です。
  • タイムスタンプで復元するには、yyyy-MM-dd HH:mm:ssまたはyyyy-MM-ddの形式を使用します。
SQL
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

構文の詳細については、「RESTORE」を参照してください。

ストリーミング動作

復元は、データを変更するオペレーションであり、ダウンストリームのワークロードで重複データが発生する可能性があります。RESTORE コマンドによって追加されたログエントリーには、true に設定された dataChange が含まれています。

テーブルへの更新を処理する構造化ストリーミングジョブなどのダウンストリームワークロードの場合、復元操作によって追加されたデータ変更ログエントリは新しいデータ更新と見なされ、それらを処理するとデータが重複する可能性があります。

例えば:

テーブルバージョン

オペレーション

ログの更新

データ変更ログ更新のレコード

0

INSERT

AddFile(/path/to/file-1, dataChange = true)

(名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55)

1

INSERT

AddFile(/path/to/file-2, dataChange = true)

(名前 = ジョージ、年齢 = 39)

2

OPTIMIZE

AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2)

レコードなし。OPTIMIZEコンパクションはテーブル内のデータを変更しません。

3

RESTORE(version=1)

RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true)

(名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55)、(名前 = ジョージ、年齢 = 39)

前述の例では、RESTORE コマンドは、テーブルバージョン0および1の読み込み時に以前に確認された更新をもたらします。ストリーミングクエリがこのテーブルを再度読み込むと、これらのファイルは新規追加データと見なされ、再度処理されます。

メトリクスを復元する

完了後、RESTORE は次のメトリクスを単一行のDataFrameとしてレポートします:

  • table_size_after_restore:復元後のテーブルのサイズ。

  • num_of_files_after_restore:復元後のテーブル内のファイルの数。

  • num_removed_files:テーブルから削除された(論理的に削除された)ファイルの数。

  • num_restored_files:ロールバックによって復元されたファイルの数。

  • removed_files_size:テーブルから削除されたファイルの合計サイズ(バイト単位)。

  • restored_files_size:復元されるファイルの合計サイズ(バイト単位)。

    復元メトリクスの例

最後のコミットバージョンを検索

全スレッド、全テーブルにわたって、現在のSparkSessionが最後に書き込んだコミットのバージョン番号を取得するには、SQL設定spark.databricks.<format>.lastCommitVersionInSessionに問い合わせます。テーブルの形式に応じて、<format>deltaまたはicebergに置き換えます。

例えば:

SQL
SET spark.databricks.delta.lastCommitVersionInSession

SparkSessionによってコミットが行われていない場合、キーをクエリーすると空の値が返されます。

注記

複数のスレッド間で同じSparkSessionを共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値への並列更新で競合状態が発生する可能性があります。