テーブル履歴を取り扱う
Apache Iceberg と Delta Lake テーブルの場合、テーブルを変更する各オペレーションは新しいテーブルバージョンを作成します。履歴情報を使用して、オペレーションを監査したり、テーブルをロールバックしたり、タイムトラベルを使用して特定の時点のテーブルを照会したりすることができます。
Databricksでは、テーブル履歴をデータアーカイブの長期バックアップソリューションとして使用することはお勧めしません。データとログの保持構成の両方をより大きな値に設定していない限り、タイムトラベルオペレーションには過去7日間のみを使用してください。
テーブル履歴の取得
テーブルへの各書き込みのオペレーション、ユーザー、タイムスタンプなどの情報を取得するには、DESCRIBE HISTORY コマンドを実行します。オペレーションは、時系列の逆順で返されます。
テーブル履歴の保持期間はテーブル設定logRetentionDurationによって決まります。デフォルトでは30日間です。
タイムトラベルとテーブル履歴は、異なる保持しきい値によって制御されます。See タイムトラベル.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL構文の詳細については、DESCRIBE HISTORYを参照してください。
Scala、Java、Python の構文の詳細については、Delta Lake API ドキュメントを参照してください。
カタログ エクスプローラーには、 「履歴」 タブでテーブルの履歴が視覚的に表示されます。
履歴スキーマ
historyオペレーションの出力には、次の列があります。
列 | Type | 説明 |
|---|---|---|
version |
| オペレーションによって生成されたテーブルのバージョン。 |
timestamp |
| このバージョンがコミットされたとき。 |
userId |
| オペレーションを実行したユーザーのID。 |
userName |
| 操作を実行したユーザーの名前。 |
operation |
| オペレーションの名前です。 |
operationParameters |
| 操作のパラメーター(たとえば、述語)。 |
ジョブ |
| その操作を実行したLakeFlow Jobの詳細。LakeFlow Job から書き込まれたコミットにのみ反映されます。それ以外の場合は、 |
notebook |
| オペレーションが実行されたDatabricksノートブックの詳細です。Databricks ノートブックから書き込まれたコミットにのみ入力されます。それ以外の場合は、 |
clusterId |
| オペレーションが実行されたクラスターのIDです。 |
readVersion |
| 書き込み操作を実行するために読み込まれたテーブルのバージョン。 |
isolationLevel |
| この操作に使用される独立性レベル。 |
isBlindAppend |
| このオペレーションでデータが追加されたかどうか。 |
operationMetrics |
| 操作のメトリクス(たとえば、変更された行とファイルの数)。 |
userMetadata |
| ユーザー定義のコミットメタデータ (指定されている場合)。 |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
-
次の方法を使用してテーブルに書き込む場合、一部の列は使用できません。
-
今後追加される列は、常に最後の列の後に追加されます。
運用パラメーターにおける の理解partitionBy
テーブル履歴のpartitionByフィールドは、テーブルのパーティションスキーマを定義または変更するCREATEおよびOVERWRITE操作にのみ意味があります。
既存のテーブルへの追加操作 (APPEND、INSERT、UPDATE、DELETE、MERGE) の場合、このフィールドには、使用される書き込み方法 (.save() 対 .saveAsTable()) に応じて、空の配列 [] またはパーティション列が表示される場合があります。
この不整合は予期された動作であり、データがパーティションに書き込まれる方法には影響しません。追加操作の検証には使用しないでください。
例
date 列でパーティション分割されたテーブルについて考えます。テーブルを作成すると、partitionBy が入力されます。
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
履歴におけるCREATE操作は次のとおりです:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
このテーブルにデータを追加すると、partitionByは空の配列を表示します。
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
追加オペレーションは以下を示します。
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
partitionBy の空の値が必要です。データは、テーブルの既存のパーティションスキーマに基づいて、引き続き正しいパーティションに書き込まれます。パスへの.save()がこのフィールドにパーティション列を表示する場合がありますが、この違いは実装の詳細であり、書き込み動作には影響しません。
運用メトリクス
historyオペレーションはoperationMetrics、列マップのオペレーションメトリクスのコレクションを返します。
以下のテーブルは、オペレーション別のマップキー定義の一覧です。
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
これらのオペレーションには、以下のメトリクスが利用可能です。
メトリクス名 | 説明 |
|---|---|
| 書き込まれたファイルの数。 |
| 書き込まれた内容のサイズ(バイト単位)。 |
| 書き込まれた行の数。 |
STREAMING UPDATE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 追加されたファイルの数。 |
| 削除されたファイルの数。 |
| 書き込まれた行の数。 |
| 書き込みサイズ(バイト単位)。 |
DELETE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 追加されたファイルの数。テーブルのパーティションが削除された場合は提供されません。 |
| 削除されたファイルの数。 |
| 削除された行の数です。テーブルのパーティションが削除された場合は提供されません。 |
| ファイルの削除処理中にコピーされた行数。 |
| オペレーション全体の実行にかかった時間。 |
| ファイルの一致をスキャンするのにかかった時間。 |
| 一致したファイルの書き換えにかかった時間です。 |
TRUNCATE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 削除されたファイルの数。 |
| オペレーション全体の実行にかかった時間。 |
MERGE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| ソースDataFrameの行数。 |
| ターゲットテーブルに挿入された行数。 |
| ターゲットテーブルで更新された行数。 |
| ターゲットテーブルで削除された行数。 |
| コピーされたターゲット行の数。 |
| 書き出された行の合計数。 |
| シンク(ターゲット)に追加されたファイルの数。 |
| シンク(ターゲット)から削除されたファイルの数。 |
| オペレーション全体の実行にかかった時間。 |
| ファイルの一致をスキャンするのにかかった時間。 |
| 一致したファイルの書き換えにかかった時間です。 |
UPDATE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 追加されたファイルの数。 |
| 削除されたファイルの数。 |
| 更新された行数。 |
| ファイルの更新プロセスでコピーされた行数。 |
| オペレーション全体の実行にかかった時間。 |
| ファイルの一致をスキャンするのにかかった時間。 |
| 一致したファイルの書き換えにかかった時間です。 |
FSCK
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 削除されたファイルの数。 |
CONVERT
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 変換されたParquetファイルの数。 |
OPTIMIZE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 追加されたファイルの数。 |
| 最適化されたファイルの数。 |
| テーブルが最適化された後に追加されたバイト数。 |
| 削除されたバイト数。 |
| テーブルが最適化された後の最小ファイルのサイズ。 |
| テーブルが最適化された後の25パーセンタイルファイルのサイズ。 |
| テーブルが最適化された後のファイルサイズの中央値。 |
| テーブルが最適化された後の75パーセンタイルファイルのサイズ。 |
| テーブルが最適化された後の最大ファイルのサイズ。 |
CLONE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| クローン作成されたバージョンでのソーステーブルのサイズ(バイト単位)。 |
| クローン作成されたバージョンのソーステーブル内のファイルの数。 |
| 以前のテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの数。 |
| 以前のテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの合計サイズ(バイト単位)。 |
| 新しい場所にコピーされたファイルの数。シャロークローンの場合は0。 |
| 新しい場所にコピーされたファイルの合計サイズ(バイト単位)。シャロークローンの場合は0。 |
RESTORE
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 復元後のテーブルサイズ(バイト単位)。 |
| 復元後のテーブル内のファイルの数 |
| 復元オペレーションによって削除されたファイルの数。 |
| 復元の結果として追加されたファイルの数。 |
| リストアによって削除されたファイルのサイズ(バイト単位)。 |
| リストアによって追加されたファイルのサイズ(バイト単位)。 |
VACUUM
このオペレーションでは、次のメトリクスが利用可能です:
メトリクス名 | 説明 |
|---|---|
| 削除されたファイルの数。 |
| vacuum処理されたディレクトリの数。 |
| 削除するファイルの数。 |
タイムトラベル
タイムトラベルは、タイムスタンプまたはテーブルバージョン(トランザクションログに記録されている)に基づいた以前のテーブルバージョンのクエリーをサポートしています。タイムトラベルは、次のようなアプリケーションに使用できます:
- 分析、レポート、または出力(機械学習モデルの出力など)を再作成します。これは、特に規制された業界でのデバッグや監査に役立つ可能性があります。
- 複雑なテンポラルクエリーを記述する。
- データの誤りを修正する。
- 急速に変化するテーブルの一連のクエリーに対してスナップショット分離を提供します。
Databricks Runtime 18.0 以降では、 deletedFileRetentionDurationテーブル プロパティ (デフォルトは 7 日) よりも古いバージョンをリクエストした場合、タイムトラベル クエリはブロックされます。 Unity Catalogマネージドテーブルの場合、これはDatabricks Runtime 12.2 以降に適用されます。
タイムトラベル構文
タイムトラベルを使用してテーブルをクエリするには、テーブル名の指定の後に句を追加します。
-
timestamp_expression次のいずれかになります:'2018-10-18T22:15:12.013Z'つまり、タイムスタンプにキャストできる文字列ですcast('2018-10-18 13:36:32 CEST' as timestamp)'2018-10-18'、つまり日付文字列ですcurrent_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- タイムスタンプにキャストされる、またはタイムスタンプにキャストできるその他の式
-
versionは、DESCRIBE HISTORY table_specの出力から取得できる長い値です。
timestamp_expressionもversionもサブクエリーにすることはできません。
日付またはタイムスタンプ文字列のみが受け入れられます。たとえば、"2019-01-01"と"2019-01-01T00:00:00.000Z"です。シンタックスの例については、以下のコードを参照してください:
- SQL
- Python
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
@構文を使用して、タイムスタンプまたはバージョンをテーブル名の一部として指定することもできます。タイムスタンプはyyyyMMddHHmmssSSS形式である必要があります。@vを使用してバージョンを指定できます。構文の例については、以下のコードを参照してください:
- SQL
- Python
-- Timestamp version
SELECT * FROM people10m@20190101000000000
-- Version number
SELECT * FROM people10m@v123
# Timestamp version
spark.read.table("people10m@20190101000000000")
# Version number
spark.read.table("people10m@v123")
タイムトラベルクエリーのデータ保持を構成する
以前のテーブルバージョンをクエリするには、そのバージョンのログファイルとデータファイルの*両方*を保持する必要があります。
VACUUMテーブルに対して実行されると、データファイルが削除されます。- テーブルバージョンのチェックポイント設定後、ログファイルは自動的に削除されます。
テーブルのデータ保持しきい値を増やすには、<format> を delta または iceberg のいずれかに置き換えて、次のテーブル プロパティを構成する必要があります。
-
<format>.logRetentionDuration = "interval <interval>":テーブルの履歴を保持する期間を制御します。デフォルトはinterval 30 daysです。- Databricks Runtime 18.0以降では、
logRetentionDurationはdeletedFileRetentionDuration以上である必要があります。Unity Catalogマネージドテーブルの場合、これはDatabricks Runtime 12.2 以降に適用されます。
- Databricks Runtime 18.0以降では、
-
<format>.deletedFileRetentionDuration = "interval <interval>":現在のテーブルバージョンで参照されなくなったデータファイルを削除するためにVACUUMが使用するしきい値を決定します。デフォルトはinterval 7 daysです。
例えば、30日間のヒストリカルデータにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days"を設定します。これは、delta.logRetentionDurationのデフォルト設定と一致します。
データ保持のしきい値を増やすと、より多くのデータファイルが保持されるため、ストレージコストが増加する可能性があります。
テーブルプロパティは、テーブル作成時に指定するか、ALTER TABLEステートメントで設定することができます。「テーブルプロパティリファレンス」を参照してください。
タイムトラベルの例
ユーザー 111 によるテーブルへの誤った削除を修正するには:
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111
テーブルへの偶発的な誤った更新を修正するには:
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
先週追加された新規顧客の数をクエリするには:
SELECT
(
SELECT count(distinct userId)
FROM my_table
)
-
(
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
) AS new_customers
トランザクションログチェックポイント
トランザクションログは、テーブルデータとともにトランザクションログディレクトリ内のJSONファイルとしてテーブルバージョンを記録します。
チェックポイントクエリを最適化するために、テーブルバージョンはParquetチェックポイントファイルに集約されます。これにより、テーブル履歴のすべてのJSONバージョンを読み取る必要がなくなり、パフォーマンスが向上します。ユーザーはチェックポイントと直接やり取りする必要はありません。
Databricksは、データサイズとワークロードに応じてチェックポイントの頻度を最適化します。チェックポイントの頻度は予告なく変更される場合があります。
テーブルを以前の状態に復元する
次のシナリオを含め、RESTOREコマンドを使用してテーブルを以前のバージョンまたはタイムスタンプに復元します。
- すでにリストアされたテーブルをリストアすることができます。
- クローンテーブルを復元できます。
次の要件を考慮してください:
- テーブルを復元するには、テーブルに対する
MODIFY権限が必要です。 - データファイルが手動で、または
VACUUMによって削除された後、それらのファイルを参照する古いバージョンにテーブルを復元することはできません。spark.sql.files.ignoreMissingFilesがtrueに設定されている場合、このバージョンへの部分的な復元は依然として可能です。 - タイムスタンプで復元するには、
yyyy-MM-dd HH:mm:ssまたはyyyy-MM-ddの形式を使用します。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
構文の詳細については、「RESTORE」を参照してください。
ストリーミング動作
復元は、データを変更するオペレーションであり、ダウンストリームのワークロードで重複データが発生する可能性があります。RESTORE コマンドによって追加されたログエントリーには、true に設定された dataChange が含まれています。
テーブルへの更新を処理する構造化ストリーミングジョブなどのダウンストリームワークロードの場合、復元操作によって追加されたデータ変更ログエントリは新しいデータ更新と見なされ、それらを処理するとデータが重複する可能性があります。
例えば:
テーブルバージョン | オペレーション | ログの更新 | データ変更ログ更新のレコード |
|---|---|---|---|
0 |
|
| (名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55) |
1 |
|
| (名前 = ジョージ、年齢 = 39) |
2 |
|
| レコードなし。 |
3 |
|
| (名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55)、(名前 = ジョージ、年齢 = 39) |
前述の例では、RESTORE コマンドは、テーブルバージョン0および1の読み込み時に以前に確認された更新をもたらします。ストリーミングクエリがこのテーブルを再度読み込むと、これらのファイルは新規追加データと見なされ、再度処理されます。
メトリクスを復元する
完了後、RESTORE は次のメトリクスを単一行のDataFrameとしてレポートします:
-
table_size_after_restore:復元後のテーブルのサイズ。 -
num_of_files_after_restore:復元後のテーブル内のファイルの数。 -
num_removed_files:テーブルから削除された(論理的に削除された)ファイルの数。 -
num_restored_files:ロールバックによって復元されたファイルの数。 -
removed_files_size:テーブルから削除されたファイルの合計サイズ(バイト単位)。 -
restored_files_size:復元されるファイルの合計サイズ(バイト単位)。
最後のコミットバージョンを検索
全スレッド、全テーブルにわたって、現在のSparkSessionが最後に書き込んだコミットのバージョン番号を取得するには、SQL設定spark.databricks.<format>.lastCommitVersionInSessionに問い合わせます。テーブルの形式に応じて、<format>をdeltaまたはicebergに置き換えます。
例えば:
- SQL
- Python
- Scala
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
SparkSessionによってコミットが行われていない場合、キーをクエリーすると空の値が返されます。
複数のスレッド間で同じSparkSessionを共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値への並列更新で競合状態が発生する可能性があります。