Databricksで Delta Lakeチェンジデータフィードを使用する
チェンジデータフィードを使用すると、DatabricksはDeltaテーブルのバージョン間の行レベルの変更を追跡できます。Deltaテーブルで有効にすると、ランタイムはテーブルに書き込まれたすべてのデータの 変更イベント を記録します。これには、行データと、指定した行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。
チェンジデータフィードを使用すると、次のような一般的なデータのユースケースを強化できます。
- ETLパイプライン : 前回のパイプライン実行以降に変更された行のみを増分処理します。
- 監査証跡 : コンプライアンスとガバナンスの要件に合わせてデータの変更を追跡します。
- データ レプリケーション : ダウンストリーム テーブル、キャッシュ、または外部システムへの変更を同期します。
チェンジデータフィードは、テーブル履歴と連携して変更情報を提供します。 Deltaテーブルをクローニングすると別の履歴が作成されるため、クローニングされたテーブルのチェンジデータフィードは元のテーブルのものと一致しません。
チェンジデータフィードを有効にする
変更データフィードは、読み取り対象のテーブルで明示的に有効にする必要があります。 次のいずれかの方法を使用します。
新しいテーブル
CREATE TABLEコマンドでテーブル プロパティdelta.enableChangeDataFeed = trueを設定します。
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
既存のテーブル
ALTER TABLEコマンドでテーブル プロパティdelta.enableChangeDataFeed = trueを設定します。
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
セッション内のすべての新しいテーブル
セッションで作成されたすべての新しいテーブルに対して変更データフィードを有効にするようにSpark構成を設定します。
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
チェンジデータフィードを有効にした後の変更のみが記録されます。 テーブルに対する過去の変更はキャプチャされません。
データフィードスキーマの変更
テーブルの変更データフィードから読み取る場合、テーブルの最新バージョンのスキーマが使用されます。 Databricks はほとんどのスキーマ変更および進化操作を完全にサポートしていますが、列マッピングが有効になっているテーブルには制限があります。「列マッピングのあるテーブルの変更データフィードの制限」を参照してください。
Deltaテーブルのスキーマのデータ列に加えて、チェンジデータフィードには、変更イベントの種類を識別するメタデータ列が含まれています:
列名 | タイプ | 値 |
|---|---|---|
| 文字列 |
|
| Long | 変更を含むDeltaログまたはテーブルのバージョン。 |
| Timestamp | コミットが作成されたときに関連付けられたタイムスタンプ。 |
(1) preimageは更新前の値、postimageは更新後の値です。
スキーマにこれらのメタデータ列と同じ名前の列が含まれている場合、テーブルで変更データフィードを有効にすることはできません。 変更データフィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決してください。
変更データを段階的に処理する
Databricks 、変更データフィードを構造化ストリーミングと組み合わせて使用し、 Deltaテーブルからの変更を段階的に処理することをお勧めします。 テーブルの変更データフィードのバージョンを自動的に追跡するには、 Databricksの構造化ストリーミングを使用する必要があります。 SCDタイプ 1 またはタイプ 2 テーブルを使用したCDC処理については、 「AUTO CDC APIs : パイプラインを使用した変更データ キャプチャの簡素化」を参照してください。
次の構文例に示すように、テーブルに対してストリームを設定してチェンジデータフィードを読み取る場合は、オプション readChangeFeed を true に設定します。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
デフォルトの動作
ストリームが最初に開始されると、テーブルの最新のスナップショットがINSERTレコードとして返され、その後、将来の変更が変更データとして返されます。変更データは Delta Lake トランザクションの一部としてコミットされ、新しいデータがテーブルにコミットされると同時に使用可能になります。
追加オプション
オプションで、開始バージョンを指定することもできます ( 「開始バージョンの指定」を参照)。また、バッチ実行を使用することもできます ( 「バッチ クエリでの変更の読み取り」を参照)。Databricks は、変更データの読み取り時にレート制限 ( maxFilesPerTrigger 、 maxBytesPerTrigger )、およびexcludeRegexもサポートします。
開始スナップショット以外のバージョンの場合、レート制限はコミット全体にアトミックに適用されます。コミット全体が現在のバッチに含まれるか、次のバッチに延期されます。
開始バージョンを指定する
特定の時点からの変更を読み取るには、タイムスタンプまたはバージョン番号を使用して開始バージョンを指定します。バッチ読み取りには開始バージョンが必要です。オプションで終了バージョンを指定して範囲を制限することもできます。Delta Lakeテーブル履歴の詳細については、 「 Delta Lakeタイムトラベルとは?」を参照してください。 。
変更データフィードを含む構造化ストリーミング ワークロードを構成する場合は、開始バージョンの指定が処理にどのような影響を与えるかを理解してください。
- 新しいデータ処理パイプラインでは通常、ストリームが最初に開始されたときにテーブル内の既存のすべてのレコードを
INSERT操作として記録するデフォルトの動作のメリットが得られます。 - ターゲットテーブルに、特定の時点までに適切な変更が加えられたすべてのレコードがすでに含まれている場合は、ソーステーブルの状態が
INSERTイベントとして処理されないように、開始バージョンを指定します。
次の例は、チェックポイントが破損したストリーミング障害から回復するための構文を示しています。この例では、次の条件を想定しています。
- チェンジデータフィードは、テーブルの作成時にソーステーブルで有効になっていました。
- ターゲットのダウンストリームテーブルは、バージョン75までのすべての変更を処理しました。
- ソース テーブルのバージョン履歴は、バージョン 70 以降で使用できます。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
この例では、新しいチェックポイントの場所も指定する必要があります。
開始バージョンを指定した場合、開始バージョンがテーブル履歴に存在しなくなった場合、ストリームは新しいチェックポイントから開始できません。 Delta Lake は履歴バージョンを自動的にクリーンアップするため、指定されたすべての開始バージョンは最終的に削除されます。
リプレイテーブル履歴を参照してください。
バッチ クエリでの変更の読み取り
バッチ クエリ構文を使用して、特定のバージョンから始まるすべての変更を読み取るか、指定したバージョン範囲内の変更を読み取ることができます。
- バージョンを整数として、タイムスタンプを
yyyy-MM-dd[ HH:mm:ss[.SSS]]の形式で文字列として指定します。 - 開始バージョンと終了バージョンが含まれます。
- 開始バージョンから最新バージョンまで読み取るには、開始バージョンのみを指定します。
- 変更データフィードが有効になる前のバージョンを指定するとエラーが発生します。
次の構文例は、バッチ読み取りで開始バージョン・オプションと終了バージョン・オプションを使用する方法を示しています。
- SQL
- Python
- Scala
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name,
-- with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
範囲外のバージョンを処理する
デフォルトでは、最後のコミットを超えるバージョンまたはタイムスタンプを指定すると、エラーtimestampGreaterThanLatestCommitがスローされます。Databricks Runtime 11.3 LTS 以降では、範囲外のバージョンに対する許容範囲を有効にすることができます。
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
この設定を有効にすると、次のようになります。
- 最後のコミット以降の開始バージョン/タイムスタンプ : 空の結果を返します。
- 最後のコミット以降の終了バージョン/タイムスタンプ : 開始から最後のコミットまでのすべての変更を返します。
データの変更を記録する
Delta Lake はデータの変更を効率的に記録し、他の Delta Lake 機能を使用してストレージ表現を最適化する場合があります。
保管に関する考慮事項
- ストレージ コスト : 変更データフィードを有効にすると、変更が別のファイルに記録される可能性があるため、ストレージ コストが若干増加する可能性があります。
- 変更ファイルを使用しない操作 : 一部の操作 (挿入のみ、パーティション全体の削除) では、変更データ ファイルが生成されません。Databricks Databricksは、トランザクション ログから変更データフィードを直接生成します。
- 保持 : 変更データ ファイルはテーブルの保持ポリシーに従います。
VACUUMコマンドはそれらを削除し、トランザクション ログからの変更はチェックポイントの保持に従います。
変更データ ファイルを直接クエリして変更データフィードを再構築しようとしないでください。 常にDelta Lake APIs使用してください。
テーブル履歴を再生する
変更データフィードは、テーブルに対するすべての変更の永続的な記録として機能することを目的としたものではありません。 有効にした後に発生した変更のみが記録され、新しいストリーミング読み取りを開始して、現在のバージョンとそれ以降のすべての変更をキャプチャできます。
変更データフィード内のレコードは一時的なものであり、指定された保存期間中にのみアクセスできます。 Delta Lakeトランザクション ログは、テーブル バージョンとそれに対応する変更データフィード バージョンを定期的に削除します。 バージョンが削除されると、そのバージョンの変更データフィードを読み取ることができなくなります。
変更データを永久履歴としてアーカイブする
ユースケースでテーブルに対するすべての変更の永続的な履歴を維持する必要がある場合は、増分ロジックを使用して変更データフィードから新しいテーブルにレコードを書き込みます。 次の例は、 trigger.AvailableNowを使用して、監査または完全な再生可能性のためのバッチ ワークロードとして利用可能なデータを処理する方法を示しています。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
列マッピングのあるテーブルのデータフィード制限を変更する
Delta テーブルで列マッピングを有効にすると、データ ファイルを書き換えずに列を削除したり名前を変更したりできます。ただし、列の名前変更や削除、データ型の変更、NULL 可能性の変更などの非加算的なスキーマ変更の後は、変更データフィードには制限があります。
- バッチ セマンティクス : 非加算的なスキーマ変更が発生するトランザクションまたは範囲に対して変更データフィードを読み取ることはできません。
- DBR 12.2 LTS以下 : 列マッピングが有効になっており、非加算的なスキーマ変更が行われたテーブルは、変更データフィードでのストリーミング読み取りをサポートしません。 列マッピングとスキーマ変更によるストリーミングを参照してください。
- DBR 11.3 LTS以前 : 列の名前変更または削除が発生した、列マッピングが有効になっているテーブルの変更データフィードを読み取ることはできません。
Databricks Runtime 12.2 LTS以降では、非加算的なスキーマ変更が行われた列マッピングが有効になっているテーブルに対して、変更データフィードでバッチ読み取りを実行できます。 読み取り操作では、最新のテーブル バージョンではなく、クエリで指定された終了バージョンのスキーマが使用されます。バージョン範囲が非加算的なスキーマ変更にまたがる場合、クエリは依然として失敗します。