テーブル履歴を取り扱う
テーブルを変更する各オペレーションは、新しいテーブルバージョンを作成します。履歴情報を使用して、オペレーションを監査したり、テーブルをロールバックしたり、タイムトラベルを使用して特定の時点のテーブルを照会したりすることができます。
Databricksでは、テーブル履歴をデータアーカイブの長期バックアップソリューションとして使用することはお勧めしません。データとログの保持構成の両方をより大きな値に設定していない限り、タイムトラベルオペレーションには過去7日間のみを使用してください。
テーブル履歴の取得
historyコマンドを実行すると、テーブルへの各書き込みのオペレーション、ユーザー、タイムスタンプなどの情報を取得できます。オペレーションは、時系列の逆順で返されます。
テーブル履歴の保持期間はテーブル設定logRetentionDurationによって決まります。デフォルトでは30日間です。
タイムトラベルとテーブル履歴は、異なる保持しきい値によって制御されます。「タイムトラベルとは?」を参照してください。
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL構文の詳細については、DESCRIBE HISTORYを参照してください。
Scala/Java/Python の構文の詳細については、Delta Lake API のドキュメントを参照してください。
カタログ エクスプローラー では、この詳細なテーブル情報と履歴を視覚的に表示できます。テーブルスキーマとサンプルデータに加えて、[ 履歴 ] タブをクリックすると、 DESCRIBE HISTORYとともに表示されるテーブル履歴を表示できます。
履歴スキーマ
historyオペレーションの出力には、次の列があります。
列 | Type | 説明 |
|---|---|---|
version | long | オペレーションによって生成されたテーブルのバージョン。 |
timestamp | timestamp | このバージョンがコミットされたとき。 |
userId | string | オペレーションを実行したユーザーのID。 |
userName | string | オペレーションを実行したユーザーの名前。 |
operation | string | オペレーションの名前。 |
operationParameters | map | オペレーションのパラメーター(述語など) |
ジョブ | struct | オペレーションを実行したLakeflow ジョブの詳細。LakeFlow Job から書き込まれたコミットにのみ反映されます。それ以外の場合は、 |
notebook | struct | オペレーションが実行されたDatabricksノートブックの詳細。Databricks ノートブックから書き込まれたコミットにのみ入力されます。それ以外の場合は、 |
clusterId | string | オペレーションが実行されたクラスターのID。 |
readVersion | long | 書き込みオペレーションを行うために読み込まれたテーブルのバージョン。 |
isolationLevel | string | このオペレーションに使用される独立性レベル。 |
isBlindAppend | boolean | このオペレーションでデータが追加されたかどうか。 |
operationMetrics | map | オペレーションの指標(たとえば、行数や変更されたファイル数など。) |
userMetadata | string | ユーザー定義のコミットメタデータ(指定されている場合) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
-
次の方法を使用してテーブルに書き込む場合、他のいくつかの列は使用できません。
-
今後追加される列は、常に最後の列の後に追加されます。
運用パラメーターにおける の理解partitionBy
partitionByフィールドは、テーブルのパーティションスキーマを定義または変更するCREATEおよびOVERWRITE操作にのみ意味があります。
既存のテーブルへの追加操作(APPEND、INSERT、UPDATE、DELETE、MERGE)の場合、このフィールドには、使用される書き込み方法(.save() と .saveAsTable())に応じて、空の配列 [] またはパーティション列が表示される場合があります。この不整合は想定される動作であり、書き込みの検証には使用しないでください。
追加オペレーションの検証には、履歴内のpartitionByに依存しないでください。値は実装の詳細によって異なりますが、データがパーティションに書き込まれる方法には影響しません。
例
date 列でパーティション分割されたテーブルを考えます。
# Initial table creation - partitionBy is populated
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
履歴におけるCREATE操作は次のとおりです:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
このテーブルにデータを追加すると、
# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
追加オペレーションは以下を示します。
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
partitionBy の空の値が必要です。データは、テーブルの既存のパーティションスキーマに基づいて、引き続き正しいパーティションに書き込まれます。パスへの.save()の場合、このフィールドにパーティション列が表示される場合がありますが、この違いは実装の詳細であり、書き込み動作には影響しません。
運用メトリクス
historyオペレーションはoperationMetrics、列マップのオペレーションメトリクスのコレクションを返します。
以下のテーブルは、オペレーション別のマップキー定義の一覧です。
オペレーション | メトリクス名 | 説明 |
|---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | 書き込まれたファイルの数。 | |
numOutputBytes | 書き込まれた内容のサイズ(バイト単位)。 | |
numOutputRows | 書き込まれた行の数。 | |
STREAMING UPDATE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numOutputRows | 書き込まれた行の数。 | |
numOutputBytes | 書き込みサイズ(バイト単位)。 | |
DELETE | ||
numAddedFiles | 追加されたファイルの数。テーブルのパーティションが削除された場合は提供されない。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numDeletedRows | 削除された行の数。テーブルのパーティションが削除された場合は提供されない。 | |
numCopiedRows | ファイルの削除処理中にコピーされた行数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのにかかった時間。 | |
rewriteTimeMs | 一致したファイルの書き換えにかかった時間。 | |
TRUNCATE | ||
numRemovedFiles | 削除されたファイルの数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
MERGE | ||
numSourceRows | ソースデータフレームの行数。 | |
numTargetRowsInserted | ターゲッテーブルに挿入された行数。 | |
numTargetRowsUpdated | ターゲットテーブルで更新された行数。 | |
numTargetRowsDeleted | ターゲットテーブルで削除された行数。 | |
numTargetRowsCopied | コピーされたターゲット行の数。 | |
numOutputRows | 書き出された行の合計数。 | |
numTargetFilesAdded | シンク(ターゲット)に追加されたファイルの数。 | |
numTargetFilesRemoved | シンク(ターゲット)から削除されたファイルの数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのにかかった時間。 | |
rewriteTimeMs | 一致したファイルの書き換えにかかった時間。 | |
UPDATE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numUpdatedRows | 更新された行数。 | |
numCopiedRows | ファイルの更新プロセスでコピーされた行数。 | |
executionTimeMs | オペレーション全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのにかかった時間。 | |
rewriteTimeMs | 一致したファイルの書き換えにかかった時間。 | |
FSCK | numRemovedFiles | 削除されたファイルの数。 |
CONVERT | numConvertedFiles | 変換されたParquetファイルの数。 |
OPTIMIZE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 最適化されたファイルの数。 | |
numAddedBytes | テーブルが最適化された後に追加されたバイト数。 | |
numRemovedBytes | 削除されたバイト数。 | |
minFileSize | テーブルが最適化された後の最小ファイルのサイズ。 | |
p25FileSize | テーブルが最適化された後の25パーセンタイルファイルのサイズ。 | |
p50FileSize | テーブルが最適化された後のファイルサイズの中央値。 | |
p75FileSize | テーブルが最適化された後の75パーセンタイルファイルのサイズ。 | |
maxFileSize | テーブルが最適化された後の最大ファイルのサイズ。 | |
クローン | ||
sourceTableSize | クローン作成されたバージョンでのソーステーブルのサイズ(バイト単位)。 | |
sourceNumOfFiles | クローン作成されたバージョンのソーステーブル内のファイルの数。 | |
numRemovedFiles | 以前のテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの数。 | |
removedFilesSize | 以前のテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの合計サイズ(バイト単位)。 | |
numCopiedFiles | 新しい場所にコピーされたファイルの数。シャロークローンの場合は0。 | |
copiedFilesSize | 新しい場所にコピーされたファイルの合計サイズ(バイト単位)。シャロークローンの場合は0。 | |
RESTORE | ||
tableSizeAfterRestore | 復元後のテーブルサイズ(バイト単位)。 | |
numOfFilesAfterRestore | 復元後のテーブル内のファイルの数。 | |
numRemovedFiles | 復元オペレーションによって削除されたファイルの数。 | |
numRestoredFiles | 復元の結果として追加されたファイルの数。 | |
removedFilesSize | リストアによって削除されたファイルのサイズ(バイト単位)。 | |
restoredFilesSize | リストアによって追加されたファイルのサイズ(バイト単位)。 | |
VACUUM | ||
numDeletedFiles | 削除されたファイルの数。 | |
numVacuumedDirectories | vacuum処理されたディレクトリの数。 | |
numFilesToDelete | 削除するファイルの数。 |
タイムトラベルとは?
タイムトラベルは、タイムスタンプまたはテーブルバージョン(トランザクションログに記録されている)に基づいた以前のテーブルバージョンのクエリーをサポートしています。タイムトラベルは、次のようなアプリケーションに使用できます:
- 分析、レポート、または出力(機械学習モデルの出力など)を再作成します。これは、特に規制された業界でのデバッグや監査に役立つ可能性があります。
- 複雑なテンポラルクエリーを記述する。
- データの誤りを修正する。
- 急速に変化するテーブルの一連のクエリーに対してスナップショット分離を提供します。
Databricks Runtime 18.0 以降では、 deletedFileRetentionDurationテーブル プロパティ (デフォルトは 7 日) よりも古いバージョンをリクエストした場合、タイムトラベル クエリはブロックされます。 Unity Catalogマネージドテーブルの場合、これはDatabricks Runtime 12.2 以降に適用されます。
タイムトラベル構文
タイムトラベルを使用してテーブルをクエリするには、テーブル名の指定の後に句を追加します。
-
timestamp_expression次のいずれかになります:'2018-10-18T22:15:12.013Z'つまり、タイムスタンプにキャストできる文字列ですcast('2018-10-18 13:36:32 CEST' as timestamp)'2018-10-18'、つまり日付文字列ですcurrent_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- タイムスタンプにキャストされる、またはタイムスタンプにキャストできるその他の式
-
versionは、DESCRIBE HISTORY table_specの出力から取得できる長い値です。
timestamp_expressionもversionもサブクエリーにすることはできません。
日付またはタイムスタンプ文字列のみが受け入れられます。たとえば、"2019-01-01"と"2019-01-01T00:00:00.000Z"です。シンタックスの例については、以下のコードを参照してください:
- SQL
- Python
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
@ 構文を使用して、テーブル名の一部としてタイムスタンプまたはバージョンを指定することもできます。タイムスタンプはyyyyMMddHHmmssSSS形式である必要があります。バージョンの前にvを付加することで、@の後にバージョンを指定できます。シンタックスの例については、以下のコードを参照してください:
- SQL
- Python
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
トランザクションログチェックポイントとは?
テーブルバージョンは、テーブルデータと一緒に保存されるトランザクションログディレクトリ内にJSONファイルとして記録されます。チェックポイントクエリを最適化するために、テーブルバージョンをParquetチェックポイントファイルに集約し、テーブル履歴のすべてのJSONバージョンを読み取る必要がなくなります。Databricksは、データサイズとワークロードに応じてチェックポイントの頻度を最適化します。ユーザーはチェックポイントと直接のやり取りは不要のはずです。チェックポイントの頻度は予告なく変更される場合があります。
タイムトラベルクエリーのデータ保持を構成する
以前のテーブルバージョンをクエリーするには、そのバージョンのログファイルとデータファイルの 両方を 保持する必要があります。
VACUUMテーブルに対して実行されると、データファイルが削除されます。テーブルのバージョンにチェックポイントを設定した後、ログファイルの削除は自動的に管理されます。
ほとんどのテーブルでは、VACUUMが定期的に実行されるため、ポイントインタイムクエリーはVACUUMの保持しきい値(デフォルトでは7日間)を尊重する必要があります。
テーブルのデータ保持しきい値を増やすには、次のテーブルプロパティを構成する必要があります:
delta.logRetentionDuration = "interval <interval>":テーブルの履歴を保持する期間を制御します。デフォルトはinterval 30 daysです。delta.deletedFileRetentionDuration = "interval <interval>":現在のテーブルバージョンで参照されなくなったデータファイルを削除するためにVACUUMが使用するしきい値を決定します。デフォルトはinterval 7 daysです。
テーブルプロパティは、テーブル作成時に指定するか、ALTER TABLEステートメントで設定することができます。「テーブルプロパティリファレンス」を参照してください。
Databricks Runtime 18.0以降では、logRetentionDurationはdeletedFileRetentionDuration以上である必要があります。Unity Catalogマネージドテーブルの場合、これはDatabricks Runtime 12.2 以降に適用されます。
30日間のヒストリカルデータにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days"を設定します (これは、delta.logRetentionDurationのデフォルト設定と一致します)。
データ保持のしきい値を増やすと、より多くのデータファイルが保持されるため、ストレージコストが増加する可能性があります。
テーブルを以前の状態に復元する
RESTOREコマンドを使用すると、テーブルを以前の状態に復元できます。テーブルは、履歴バージョンを内部的に保持し、以前の状態に復元できるようにします。以前の状態に対応するバージョン、または以前の状態が作成されたときのタイムスタンプは、RESTOREコマンドのオプションとしてサポートされています。
- すでにリストアされたテーブルをリストアすることができます。
- クローンテーブルを復元できます。
- 復元するテーブルに対する
MODIFY権限が必要です。 - データファイルが手動または
vacuumによって削除された古いバージョンにテーブルをリストアすることはできません。spark.sql.files.ignoreMissingFilesをtrueに設定しても、部分的にこのバージョンにリストアすることは可能です。 - 以前の状態にリストアするためのタイムスタンプのフォーマットは
yyyy-MM-dd HH:mm:ssです。日付(yyyy-MM-dd)文字列のみの指定もサポートされています。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
構文の詳細については、「RESTORE」を参照してください。
復元は、データを変更するオペレーションと見なされます。RESTOREコマンドによって追加されたログエントリーには、trueに設定されたdataChangeが含まれています。テーブルの更新を処理する構造化ストリーミングジョブのようなダウンストリームアプリケーションがある場合、リストアオペレーションによって追加されたデータ変更ログエントリーは新しいデータ更新とみなされ、それらを処理すると重複データが発生する可能性があります。
例えば:
テーブルバージョン | オペレーション | ログの更新 | データ変更ログ更新のレコード |
|---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (名前 = ヴィクトル、年齢 = 29、(名前 = ジョージ、年齢 = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (名前 = ジョージ、年齢 = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false)、RemoveFile(/path/to/file-1)、RemoveFile(/path/to/file-2) | (最適化圧縮ではテーブル内のデータが変更されないため、レコードはありません) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55)、(名前 = ジョージ、年齢 = 39) |
前述の例では、RESTOREコマンドにより、テーブルバージョン0および1の読み取り時にすでに確認されていた更新が行われます。ストリーミングクエリーがこのテーブルを読み取っていた場合、これらのファイルは新しく追加されたデータとみなされ、再度処理されます。
メトリクスを復元する
RESTORE オペレーションが完了すると、次のメトリクスを単一行のデータフレームとしてレポートします:
-
table_size_after_restore:復元後のテーブルのサイズ。 -
num_of_files_after_restore:復元後のテーブル内のファイルの数。 -
num_removed_files:テーブルから削除された(論理的に削除された)ファイルの数。 -
num_restored_files:ロールバックによって復元されたファイルの数。 -
removed_files_size:テーブルから削除されたファイルの合計サイズ(バイト単位)。 -
restored_files_size:復元されるファイルの合計サイズ(バイト単位)。
タイムトラベルの使用例
-
ユーザー
111のテーブルへの誤った削除を修正しました:SQLINSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111 -
テーブルに対する偶発的な誤った更新を修正します:
SQLMERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET * -
先週追加された新規顧客の数を照会します。
SQLSELECT
(
SELECT count(distinct userId)
FROM my_table
)
-
(
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
) AS new_customers
Sparkセッションで最後のコミットのバージョンを確認するにはどうすればよいですか?
全スレッド、全テーブルにわたって、現在のSparkSessionが最後に書き込んだコミットのバージョン番号を取得するには、SQL設定spark.databricks.delta.lastCommitVersionInSessionに問い合わせます。
Apache Icebergテーブルについては、spark.databricks.delta.lastCommitVersionInSession の代わりに spark.databricks.iceberg.lastCommitVersionInSession を使用してください。
- SQL
- Python
- Scala
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
SparkSessionによってコミットが行われていない場合、キーをクエリーすると空の値が返されます。
複数のスレッド間で同じSparkSessionを共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値が同時に更新されるため、競合状態が発生する可能性があります。