Delta Lakeのテーブル履歴を取り扱う

Delta Lakeのテーブルを変更する各オペレーションは、新しいテーブルバージョンを作成します。履歴情報を使用して、オペレーションを監査したり、テーブルをロールバックしたり、タイムトラベルを使用して特定の時点のテーブルを照会したりすることができます。

Databricksでは、データアーカイブの長期バックアップソリューションとしてDelta Lakeテーブル履歴を使用することはお勧めしません。Databricksでは、データとログの保持構成の両方をより大きな値に設定していない限り、タイムトラベルオペレーションには過去7日間のみを使用することをお勧めします。

Deltaテーブル履歴の取得

historyコマンドを実行すると、Deltaテーブルへの各書き込みのオペレーション、ユーザー、タイムスタンプなどの情報を取得できます。オペレーションは、時系列の逆順で返されます。

テーブル履歴の保持期間はテーブル設定delta.logRetentionDurationによって決まります。デフォルトでは30日間です。

タイムトラベルとテーブル履歴は、異なる保持しきい値によって制御されます。 タイム トラベル Delta Lake とはを参照してください

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Spark SQL構文の詳細については、「DESCRIBE HISTORY」を参照してください。

Scala/Java/Python 構文の詳細については、 Delta Lake API のドキュメント を参照してください。

「カタログエクスプローラ」(Catalog Explorer ) では、 Delta テーブルの詳細なテーブル情報と履歴を視覚的に表示できます。 テーブルスキーマとサンプルデータに加えて、「履歴」タブをクリックして、 DESCRIBE HISTORYとともに表示されるテーブル履歴を表示できます。

履歴スキーマ

historyオペレーションの出力には、次の列があります。

タイプ

説明

バージョン

ロング

オペレーションによって生成されたテーブルのバージョン。

タイムスタンプ

タイムスタンプ

このバージョンがコミットされたとき。

ユーザーID

文字列

オペレーションを実行したユーザーのID。

userName

文字列

オペレーションを実行したユーザーの名前。

オペレーション

文字列

オペレーションの名前。

operationParameters

マップ

オペレーションのパラメーター(述語など)

ジョブ

構造体

オペレーションを実行したジョブの詳細。

notebook

構造体

オペレーションが実行されたノートブックの詳細。

ClusterID

文字列

オペレーションが実行されたクラスターのID。

readVersion

ロング

書き込みオペレーションを行うために読み込まれたテーブルのバージョン。

isolationLevel

文字列

このオペレーションに使用される独立性レベル。

isBlindAppend

ブーリアン

このオペレーションでデータが追加されたかどうか。

operationMetrics

マップ

オペレーションの指標(たとえば、行数や変更されたファイル数など。)

userMetadata

文字列

ユーザー定義のコミットメタデータ(指定されている場合)

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

オペレーションメトリクスキー

historyオペレーションはoperationMetrics、列マップのオペレーションメトリクスのコレクションを返します。

以下のテーブルは、オペレーション別のマップキー定義の一覧です。

オペレーション

メトリクス名

説明

WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO

numFiles

書き込まれたファイルの数。

numOutputBytes

書き込まれた内容のサイズ(バイト単位)。

numOutputRows

書き込まれた行の数。

ストリーミングアップデート

numAddedFiles

追加されたファイルの数。

numRemovedFiles

削除されたファイルの数。

numOutputRows

書き込まれた行の数。

numOutputBytes

書き込みサイズ(バイト単位)。

DELETE

numAddedFiles

追加されたファイルの数。テーブルのパーティションが削除された場合は提供されない。

numRemovedFiles

削除されたファイルの数。

numDeletedRows

削除された行の数。テーブルのパーティションが削除された場合は提供されない。

numCopiedRows

ファイルの削除処理中にコピーされた行数。

executionTimeMs

オペレーション全体の実行にかかった時間。

scanTimeMs

ファイルの一致をスキャンするのにかかった時間。

rewriteTimeMs

一致したファイルの書き換えにかかった時間。

TRUNCATE

numRemovedFiles

削除されたファイルの数。

executionTimeMs

オペレーション全体の実行にかかった時間。

MERGE

numSourceRows

ソースデータフレームの行数。

numTargetRowsInserted

ターゲッテーブルに挿入された行数。

numTargetRowsUpdated

ターゲットテーブルで更新された行数。

numTargetRowsDeleted

ターゲットテーブルで削除された行数。

numTargetRowsCopied

コピーされたターゲット行の数。

numOutputRows

書き出された行の合計数。

numTargetFilesAdded

シンク(ターゲット)に追加されたファイルの数。

numTargetFilesRemoved

シンク(ターゲット)から削除されたファイルの数。

executionTimeMs

オペレーション全体の実行にかかった時間。

scanTimeMs

ファイルの一致をスキャンするのにかかった時間。

rewriteTimeMs

一致したファイルの書き換えにかかった時間。

UPDATE

numAddedFiles

追加されたファイルの数。

numRemovedFiles

削除されたファイルの数。

numUpdatedRows

更新された行数。

numCopiedRows

ファイルの更新プロセスでコピーされた行数。

executionTimeMs

オペレーション全体の実行にかかった時間。

scanTimeMs

ファイルの一致をスキャンするのにかかった時間。

rewriteTimeMs

一致したファイルの書き換えにかかった時間。

FSCK

numRemovedFiles

削除されたファイルの数。

CONVERT

numConvertedFiles

変換されたParquetファイルの数。

OPTIMIZE

numAddedFiles

追加されたファイルの数。

numRemovedFiles

最適化されたファイルの数。

numAddedBytes

テーブルが最適化された後に追加されたバイト数。

numRemovedBytes

削除されたバイト数。

minFileSize

テーブルが最適化された後の最小ファイルのサイズ。

p25FileSize

テーブルが最適化された後の25パーセンタイルファイルのサイズ。

p50FileSize

テーブルが最適化された後のファイルサイズの中央値。

p75FileSize

テーブルが最適化された後の75パーセンタイルファイルのサイズ。

maxFileSize

テーブルが最適化された後の最大ファイルのサイズ。

CLONE

sourceTableSize

クローン作成されたバージョンでのソーステーブルのサイズ(バイト単位)。

sourceNumOfFiles

クローン作成されたバージョンのソーステーブル内のファイルの数。

numRemovedFiles

以前のDeltaテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの数。

removedFilesSize

以前のDeltaテーブルが置き換えられた場合にターゲットテーブルから削除されたファイルの合計サイズ(バイト単位)。

numCopiedFiles

新しい場所にコピーされたファイルの数。シャロークローンの場合は0。

copiedFilesSize

新しい場所にコピーされたファイルの合計サイズ(バイト単位)。シャロークローンの場合は0。

RESTORE

tableSizeAfterRestore

復元後のテーブルサイズ(バイト単位)。

numOfFilesAfterRestore

復元後のテーブル内のファイルの数。

numRemovedFiles

復元オペレーションによって削除されたファイルの数。

numRestoredFiles

復元の結果として追加されたファイルの数。

removedFilesSize

リストアによって削除されたファイルのサイズ(バイト単位)。

restoredFilesSize

リストアによって追加されたファイルのサイズ(バイト単位)。

VACUUM

numDeletedFiles

削除されたファイルの数。

numVacuumedDirectories

vacuum処理されたディレクトリの数。

numFilesToDelete

削除するファイルの数。

Delta Lakeのタイムトラベルとは?

Delta Lakeタイムトラベルは、タイムスタンプまたはテーブルバージョン(トランザクションログに記録されている)に基づいた以前のテーブルバージョンのクエリーをサポートしています。タイムトラベルは、次のようなアプリケーションに使用できます:

  • 分析、レポート、または出力(機械学習モデルの出力など)を再作成します。これは、特に規制された業界でのデバッグや監査に役立つ可能性があります。

  • 複雑なテンポラルクエリーを記述する。

  • データの誤りを修正する。

  • 急速に変化するテーブルの一連のクエリーに対してスナップショット分離を提供します。

重要

タイムトラベルでアクセスできるテーブルバージョンは、トランザクションログファイルの保持しきい値と、VACUUMオペレーションの頻度と指定された保持期間の組み合わせによって決まります。デフォルト値を使用してVACUUM毎日実行すると、タイムトラベルに7日間のデータが利用可能になります。

Deltaタイムトラベル構文

タイムトラベルを使用してDeltaテーブルをクエリーするには、テーブル名の指定の後に句を追加します。

  • timestamp_expression 次のいずれかになります:

    • '2018-10-18T22:15:12.013Z'つまり、タイムスタンプにキャストできる文字列です

    • cast('2018-10-18 13:36:32 CEST' as timestamp)

    • '2018-10-18'、つまり日付文字列です

    • current_timestamp() - interval 12 hours

    • date_sub(current_date(), 1)

    • タイムスタンプにキャストされる、またはタイムスタンプにキャストできるその他の式

  • version は、DESCRIBE HISTORY table_specの出力から取得できる長い値です。

timestamp_expressionversionもサブクエリーにすることはできません。

日付またはタイムスタンプ文字列のみが受け入れられます。たとえば、"2019-01-01""2019-01-01T00:00:00.000Z"です。シンタックスの例については、以下のコードを参照してください:

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

@ 構文を使用して、テーブル名の一部としてタイムスタンプまたはバージョンを指定することもできます。タイムスタンプはyyyyMMddHHmmssSSS形式である必要があります。バージョンの前にvを付加することで、@の後にバージョンを指定できます。シンタックスの例については、以下のコードを参照してください:

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

トランザクションログチェックポイントとは?

Delta Lakeは、テーブルのバージョンを_delta_logディレクトリ内のJSONファイルとして記録し、テーブルデータと一緒に保存します。チェックポイントクエリーを最適化するために、Delta LakeはテーブルバージョンをParquetチェックポイントファイルに集約し、テーブル履歴のすべてのJSONバージョンの読み取りが不要になります。Databricksは、データサイズとワークロードに応じてチェックポイントの頻度を最適化します。ユーザーはチェックポイントと直接のやり取りは不要のはずです。チェックポイントの頻度は予告なく変更される場合があります。

タイムトラベルクエリーのデータ保持を構成する

以前のテーブルバージョンをクエリーするには、そのバージョンのログファイルとデータファイルの両方を保持する必要があります。

VACUUMテーブルに対して実行されると、データファイルが削除されます。Delta Lakeは、テーブルのバージョンにチェックポイントを設定した後、ログファイルの削除を自動的に管理します。

ほとんどのDeltaテーブルでは、VACUUMが定期的に実行されるため、ポイントインタイムクエリーはVACUUMの保持しきい値(デフォルトでは7日間)を尊重する必要があります。

Deltaテーブルのデータ保持しきい値を増やすには、次のテーブルプロパティを構成する必要があります:

  • delta.logRetentionDuration = "interval <interval>":テーブルの履歴を保持する期間を制御します。デフォルトはinterval 30 daysです。

  • delta.deletedFileRetentionDuration = "interval <interval>":現在のテーブルバージョンで参照されなくなったデータファイルを削除するためにVACUUMが使用するしきい値を決定します。デフォルトはinterval 7 daysです。

Deltaプロパティは、テーブル作成時に指定するか、ALTER TABLEステートメントで設定することができます。「Deltaテーブルのプロパティ」を参照してください。

VACUUMオペレーションが頻繁に行われるテーブルの履歴が長期間保持されるようにするには、これらのプロパティを両方設定する必要があります。たとえば、30日間の履歴データにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days"を設定します(これは、delta.logRetentionDurationのデフォルト設定と一致します)。

データ保持のしきい値を増やすと、より多くのデータファイルが保持されるため、ストレージコストが増加する可能性があります。

Deltaテーブルを以前の状態に復元する

RESTOREコマンドを使用すると、Deltaテーブルを以前の状態に復元できます。Deltaテーブルは、テーブルの履歴バージョンを内部的に保持し、以前の状態に復元できるようにします。以前の状態に対応するバージョン、または以前の状態が作成されたときのタイムスタンプは、RESTOREコマンドのオプションとしてサポートされています。

重要

  • すでにリストアされたテーブルをリストアすることができます。

  • クローンテーブルを復元できます。

  • 復元するテーブルに対するMODIFY権限が必要です。

  • データファイルが手動またはvacuumによって削除された古いバージョンにテーブルをリストアすることはできません。spark.sql.files.ignoreMissingFilestrueに設定しても、部分的にこのバージョンにリストアすることは可能です。

  • 以前の状態にリストアするためのタイムスタンプのフォーマットはyyyy-MM-dd HH:mm:ssです。日付(yyyy-MM-dd)文字列のみの指定もサポートされています。

RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

構文の詳細については、「RESTORE」を参照してください。

重要

復元は、データを変更するオペレーションと見なされます。RESTOREコマンドによって追加されたDelta Lakeログエントリーには、trueに設定されたdataChangeが含まれています。Delta Lakeテーブルの更新を処理するStructured Streamingジョブのようなダウンストリームアプリケーションがある場合、リストアオペレーションによって追加されたデータ変更ログエントリーは新しいデータ更新とみなされ、それらを処理すると重複データが発生する可能性があります。

例:

テーブルバージョン

オペレーション

Deltaログの更新

データ変更ログ更新のレコード

0

INSERT

AddFile(/path/to/file-1, dataChange = true)

(名前 = ヴィクトル、年齢 = 29、(名前 = ジョージ、年齢 = 55)

1

INSERT

AddFile(/path/to/file-2, dataChange = true)

(名前 = ジョージ、年齢 = 39)

2

OPTIMIZE

AddFile(/path/to/file-3, dataChange = false)、RemoveFile(/path/to/file-1)、RemoveFile(/path/to/file-2)

(最適化圧縮ではテーブル内のデータが変更されないため、レコードはありません)

3

RESTORE(version=1)

RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true)

(名前 = ヴィクトル、年齢 = 29)、(名前 = ジョージ、年齢 = 55)、(名前 = ジョージ、年齢 = 39)

前述の例では、RESTOREコマンドにより、Deltaテーブルバージョン0および1の読み取り時にすでに確認されていた更新が行われます。ストリーミングクエリーがこのテーブルを読み取っていた場合、これらのファイルは新しく追加されたデータとみなされ、再度処理されます。

メトリクスを復元する

RESTORE オペレーションが完了すると、次のメトリクスを単一行のデータフレームとしてレポートします:

  • table_size_after_restore:復元後のテーブルのサイズ。

  • num_of_files_after_restore:復元後のテーブル内のファイルの数。

  • num_removed_files:テーブルから削除された(論理的に削除された)ファイルの数。

  • num_restored_files:ロールバックによって復元されたファイルの数。

  • removed_files_size:テーブルから削除されたファイルの合計サイズ(バイト単位)。

  • restored_files_size:復元されるファイルの合計サイズ(バイト単位)。

    復元メトリクスの例

Delta Lakeのタイムトラベルの使用例

  • ユーザー111のテーブルへの誤った削除を修正しました:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • テーブルに対する偶発的な誤った更新を修正します:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 先週追加された新規顧客の数を照会します。

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Sparkセッションで最後のコミットのバージョンを確認するにはどうすればよいですか?

全スレッド、全テーブルにわたって、現在のSparkSessionが最後に書き込んだコミットのバージョン番号を取得するには、SQL設定spark.databricks.delta.lastCommitVersionInSessionに問い合わせます。

SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

SparkSessionによってコミットが行われていない場合、キーをクエリーすると空の値が返されます。

複数のスレッド間で同じSparkSessionを共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値が同時に更新されるため、競合状態が発生する可能性があります。