Delta Lake テーブルの履歴を操作する
Delta Lakeのテーブルを変更する各オペレーションは、新しいテーブルバージョンを作成します。履歴情報を使用して、オペレーションを監査したり、テーブルをロールバックしたり、タイムトラベルを使用して特定の時点のテーブルを照会したりすることができます。
Databricksでは、データアーカイブの長期バックアップソリューションとしてDelta Lakeテーブル履歴を使用することはお勧めしません。Databricksでは、データとログの保持構成の両方をより大きな値に設定していない限り、タイムトラベルオペレーションには過去7日間のみを使用することをお勧めします。
Delta テーブルの履歴を取得する
history
コマンドを実行すると、Deltaテーブルへの各書き込みのオペレーション、ユーザー、タイムスタンプなどの情報を取得できます。オペレーションは、時系列の逆順で返されます。
テーブル履歴の保持期間はテーブル設定delta.logRetentionDuration
によって決まります。デフォルトでは30日間です。
タイムトラベルとテーブル履歴は、異なる保有しきい値によって制御されます。 「Delta Lake タイムトラベルとは」を参照してください。
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL 構文の詳細については、 DESCRIBE HISTORYを参照してください。
Scala/Java/Python 構文の詳細については、 Delta Lake API のドキュメント を参照してください。
カタログ エクスプローラー では、Delta テーブルのこの詳細なテーブル情報と履歴を視覚的に表示できます。 テーブルスキーマとサンプルデータに加えて、[ 履歴 ] タブをクリックすると、 DESCRIBE HISTORY
とともに表示されるテーブル履歴を表示できます。
履歴スキーマ
history
オペレーションの出力には、次の列があります。
列 | タイプ | 説明 |
---|---|---|
version | long | オペレーションによって生成されたテーブルのバージョン。 |
timestamp | timestamp | このバージョンがコミットされたとき。 |
userId | string | オペレーションを実行したユーザーのID。 |
userName | string | オペレーションを実行したユーザーの名前。 |
operation | string | オペレーションの名前。 |
operationParameters | map | オペレーションのパラメーター(述語など) |
job | 構造体 | オペレーションを実行したジョブの詳細。 |
notebook | 構造体 | オペレーションが実行されたノートブックの詳細。 |
clusterId | string | オペレーションが実行されたクラスターのID。 |
readVersion | long | 書き込みオペレーションを行うために読み込まれたテーブルのバージョン。 |
isolationLevel | string | このオペレーションに使用される独立性レベル。 |
isBlindAppend | boolean | このオペレーションでデータが追加されたかどうか。 |
operationMetrics | map | オペレーションの指標(たとえば、行数や変更されたファイル数など。) |
userMetadata | string | ユーザー定義のコミットメタデータ(指定されている場合) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
-
次の方法を使用してDeltaテーブルに書き込む場合、他のいくつかの列は使用できません:
-
今後追加される列は、常に最後の列の後に追加されます。
Operation メトリクスキー
history
オペレーションはoperationMetrics
、列マップのオペレーションメトリクスのコレクションを返します。
以下のテーブルは、オペレーション別のマップキー定義の一覧です。
オペレーション | メトリクス名 | 説明 |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | 書き込まれたファイルの数。 | |
numOutputBytes | 書き込まれた内容のサイズ(バイト単位)。 | |
numOutputRows | 書き込まれた行の数。 | |
STREAMING UPDATE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numOutputRows | 書き込まれた行の数。 | |
numOutputBytes | 書き込みサイズ(バイト単位)。 | |
DELETE | ||
numAddedFiles | 追加されたファイルの数。テーブルのパーティションが削除された場合は提供されない。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numDeletedRows | 削除された行の数。テーブルのパーティションが削除された場合は提供されない。 | |
numCopiedRows | ファイルの削除処理中にコピーされた行数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのにかかった時間。 | |
rewriteTimeMs | 一致したファイルの書き換えにかかった時間。 | |
TRUNCATE | ||
numRemovedFiles | 削除されたファイルの数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
MERGE | ||
numSourceRows | ソースデータフレームの行数。 | |
numTargetRowsInserted | ターゲッテーブルに挿入された行数。 | |
numTargetRowsUpdated | ターゲットテーブルで更新された行数。 | |
numTargetRowsDeleted | ターゲットテーブルで削除された行数。 | |
numTargetRowsCopied | コピーされたターゲット行の数。 | |
numOutputRows | 書き出された行の合計数。 | |
numTargetFilesAdded | シンク(ターゲット)に追加されたファイルの数。 | |
numTargetFilesRemoved | シンク(ターゲット)から削除されたファイルの数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのにかかった時間。 | |
rewriteTimeMs | 一致したファイルの書き換えにかかった時間。 | |
UPDATE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numUpdatedRows | 更新された行数。 | |
numCopiedRows | ファイルの更新プロセスでコピーされた行数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのにかかった時間。 | |
rewriteTimeMs | 一致したファイルの書き換えにかかった時間。 | |
FSCK | numRemovedFiles | 削除されたファイルの数。 |
CONVERT | numConvertedFiles | 変換されたParquetファイルの数。 |
OPTIMIZE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 最適化されたファイルの数。 | |
numAddedBytes | テーブルが最適化された後に追加されたバイト数。 | |
numRemovedBytes | 削除されたバイト数。 | |
minFileSize | テーブルが最適化された後の最小ファイルのサイズ。 | |
p25FileSize | テーブルが最適化された後の25パーセンタイルファイルのサイズ。 | |
p50FileSize | テーブルが最適化された後のファイルサイズの中央値。 | |
p75FileSize | テーブルが最適化された後の75パーセンタイルファイルのサイズ。 | |
maxFileSize | テーブルが最適化された後の最大ファイルのサイズ。 | |
CLONE | ||
sourceTableSize | クローン作成されたバージョンでのソーステーブルのサイズ(バイト単位)。 | |
sourceNumOfFiles | クローン作成されたバージョンのソーステーブル内のファイルの数。 | |
numRemovedFiles | 以前のDeltaテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの数。 | |
removedFilesSize | 以前のDeltaテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの合計サイズ(バイト単位)。 | |
numCopiedFiles | 新しい場所にコピーされたファイルの数。シャロークローンの場合は0。 | |
copiedFilesSize | 新しい場所にコピーされたファイルの合計サイズ(バイト単位)。シャロークローンの場合は0。 | |
RESTORE | ||
tableSizeAfterRestore | 復元後のテーブルサイズ(バイト単位)。 | |
numOfFilesAfterRestore | 復元後のテーブル内のファイルの数。 | |
numRemovedFiles | 復元オペレーションによって削除されたファイルの数。 | |
numRestoredFiles | 復元の結果として追加されたファイルの数。 | |
removedFilesSize | リストアによって削除されたファイルのサイズ(バイト単位)。 | |
restoredFilesSize | リストアによって追加されたファイルのサイズ(バイト単位)。 | |
VACUUM | ||
numDeletedFiles | 削除されたファイルの数。 | |
numVacuumedDirectories | vacuum処理されたディレクトリの数。 | |
numFilesToDelete | 削除するファイルの数。 |
Delta Lake タイムトラベルとは?
Delta Lakeタイムトラベルは、タイムスタンプまたはテーブルバージョン(トランザクションログに記録されている)に基づいた以前のテーブルバージョンのクエリーをサポートしています。タイムトラベルは、次のようなアプリケーションに使用できます:
- 分析、レポート、または出力(機械学習モデルの出力など)を再作成します。これは、特に規制された業界でのデバッグや監査に役立つ可能性があります。
- 複雑なテンポラルクエリーを記述する。
- データの誤りを修正する。
- 急速に変化するテーブルの一連のクエリーに対してスナップショット分離を提供します。
タイムトラベルでアクセスできるテーブルバージョンは、トランザクションログファイルの保持しきい値と、VACUUM
オペレーションの頻度と指定された保持期間の組み合わせによって決まります。デフォルト値を使用してVACUUM
毎日実行すると、タイムトラベルに7日間のデータが利用可能になります。
Delta タイムトラベルの構文
タイムトラベルを使用してDeltaテーブルをクエリーするには、テーブル名の指定の後に句を追加します。
-
timestamp_expression
次のいずれかになります:'2018-10-18T22:15:12.013Z'
つまり、タイムスタンプにキャストできる文字列ですcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
、つまり日付文字列ですcurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- タイムスタンプにキャストされる、またはタイムスタンプにキャストできるその他の式
-
version
は、DESCRIBE HISTORY table_spec
の出力から取得できる長い値です。
timestamp_expression
もversion
もサブクエリーにすることはできません。
日付またはタイムスタンプ文字列のみが受け入れられます。たとえば、"2019-01-01"
と"2019-01-01T00:00:00.000Z"
です。シンタックスの例については、以下のコードを参照してください:
- SQL
- Python
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
@
構文を使用して、テーブル名の一部としてタイムスタンプまたはバージョンを指定することもできます。タイムスタンプはyyyyMMddHHmmssSSS
形式である必要があります。バージョンの前にv
を付加することで、@
の後にバージョンを指定できます。シンタックスの例については、以下のコードを参照してください:
- SQL
- Python
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
トランザクション・ログ・チェックポイントとは
Delta Lakeは、テーブルのバージョンを_delta_log
ディレクトリ内のJSONファイルとして記録し、テーブルデータと一緒に保存します。チェックポイントクエリーを最適化するために、Delta LakeはテーブルバージョンをParquetチェックポイントファイルに集約し、テーブル履歴のすべてのJSONバージョンの読み取りが不要になります。Databricksは、データサイズとワークロードに応じてチェックポイントの頻度を最適化します。ユーザーはチェックポイントと直接のやり取りは不要のはずです。チェックポイントの頻度は予告なく変更される場合があります。
タイムトラベル クエリのデータ保持を構成する
以前のテーブルバージョンをクエリーするには、そのバージョンのログファイルとデータファイルの 両方を 保持する必要があります。
VACUUM
テーブルに対して実行されると、データファイルが削除されます。Delta Lakeは、テーブルのバージョンにチェックポイントを設定した後、ログファイルの削除を自動的に管理します。
ほとんどのDeltaテーブルでは、VACUUM
が定期的に実行されるため、ポイントインタイムクエリーはVACUUM
の保持しきい値(デフォルトでは7日間)を尊重する必要があります。
Deltaテーブルのデータ保持しきい値を増やすには、次のテーブルプロパティを構成する必要があります:
delta.logRetentionDuration = "interval <interval>"
:テーブルの履歴を保持する期間を制御します。デフォルトはinterval 30 days
です。delta.deletedFileRetentionDuration = "interval <interval>"
:現在のテーブルバージョンで参照されなくなったデータファイルを削除するためにVACUUM
が使用するしきい値を決定します。デフォルトはinterval 7 days
です。
Delta プロパティは、テーブルの作成時に指定することも、 ALTER TABLE
ステートメントで設定することもできます。 「Delta テーブル プロパティ リファレンス」を参照してください。
VACUUM
オペレーションが頻繁に行われるテーブルの履歴が長期間保持されるようにするには、これらのプロパティを両方設定する必要があります。たとえば、30日間の履歴データにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days"
を設定します(これは、delta.logRetentionDuration
のデフォルト設定と一致します)。
データ保持のしきい値を増やすと、より多くのデータファイルが保持されるため、ストレージコストが増加する可能性があります。
Delta テーブルを以前の状態に復元する
RESTORE
コマンドを使用すると、Deltaテーブルを以前の状態に復元できます。Deltaテーブルは、テーブルの履歴バージョンを内部的に保持し、以前の状態に復元できるようにします。
以前の状態に対応するバージョン、または以前の状態が作成されたときのタイムスタンプは、RESTORE
コマンドのオプションとしてサポートされています。
- すでにリストアされたテーブルをリストアすることができます。
- クローニングされたテーブルをリストアできます。
- 復元するテーブルに対する
MODIFY
権限が必要です。 - データファイルが手動または
vacuum
によって削除された古いバージョンにテーブルをリストアすることはできません。spark.sql.files.ignoreMissingFiles
をtrue
に設定しても、部分的にこのバージョンにリストアすることは可能です。 - 以前の状態にリストアするためのタイムスタンプのフォーマットは
yyyy-MM-dd HH:mm:ss
です。日付(yyyy-MM-dd
)文字列のみの指定もサポートされています。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
構文の詳細については、 RESTOREを参照してください。
復元は、データを変更するオペレーションと見なされます。RESTORE
コマンドによって追加されたDelta Lakeログエントリーには、trueに設定されたdataChangeが含まれています。Delta Lakeテーブルの更新を処理するStructured Streamingジョブのようなダウンストリームアプリケーションがある場合、リストアオペレーションによって追加されたデータ変更ログエントリーは新しいデータ更新とみなされ、それらを処理すると重複データが発生する可能性があります。
例えば:
テーブルバージョン | オペレーション | Deltaログの更新 | データ変更ログ更新のレコード |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (名前 = ヴィクトル、年齢 = 29、(名前 = ジョージ、年齢 = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (名前 = ジョージ、年齢 = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false)、RemoveFile(/path/to/file-1)、RemoveFile(/path/to/file-2) | (最適化圧縮ではテーブル内のデータが変更されないため、レコードはありません) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55)、(名前 = ジョージ、年齢 = 39) |
前述の例では、RESTORE
コマンドにより、Deltaテーブルバージョン0および1の読み取り時にすでに確認されていた更新が行われます。ストリーミングクエリーがこのテーブルを読み取っていた場合、これらのファイルは新しく追加されたデータとみなされ、再度処理されます。
Restore メトリクス
RESTORE
オペレーションが完了すると、次のメトリクスを単一行のデータフレームとしてレポートします:
-
table_size_after_restore
:復元後のテーブルのサイズ。 -
num_of_files_after_restore
:復元後のテーブル内のファイルの数。 -
num_removed_files
:テーブルから削除された(論理的に削除された)ファイルの数。 -
num_restored_files
:ロールバックによって復元されたファイルの数。 -
removed_files_size
:テーブルから削除されたファイルの合計サイズ(バイト単位)。 -
restored_files_size
:復元されるファイルの合計サイズ(バイト単位)。
Delta Lake タイムトラベルの使用例
-
ユーザー
111
のテーブルへの誤った削除を修正しました:SQLINSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111 -
テーブルに対する偶発的な誤った更新を修正します:
SQLMERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET * -
先週追加された新規顧客の数を照会します。
SQLSELECT count(distinct userId)
FROM my_table - (
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Sparkセッションで最後のコミットのバージョンを見つけるにはどうすればよいですか?
全スレッド、全テーブルにわたって、現在のSparkSession
が最後に書き込んだコミットのバージョン番号を取得するには、SQL設定spark.databricks.delta.lastCommitVersionInSession
に問い合わせます。
- SQL
- Python
- Scala
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
SparkSession
によってコミットが行われていない場合、キーをクエリーすると空の値が返されます。
複数のスレッド間で同じSparkSession
を共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値が同時に更新されるため、競合状態が発生する可能性があります。