Databricksでチェンジデータフィードを使用する
チェンジデータフィード(CDF)は、Delta LakeテーブルまたはApache Iceberg v3テーブルのバージョン間の行レベルの変更を追跡します。
Databricks は 2つのアプローチをサポートしています:
- 自動チェンジデータフィード :テーブル読み取り中に、行のリネージメタデータを使用して変更をコンピュートします。個別のテーブル設定は必要なく、Delta Lake および Apache Iceberg v3 テーブルに対応しています。自動チェンジデータフィードを参照してください。
- レガシー チェンジデータフィード :テーブル書き込み時に変更をマテリアライズします。Delta Lake テーブルのみをサポートします。個別のテーブル構成が必要です。Delta Lake のレガシ チェンジデータフィード を参照してください。
チェンジデータフィードは、次のような一般的なデータユースケースに利用できます:
- 前回のパイプライン実行以降に変更された行のみを処理する増分ETLパイプラインです。
- コンプライアンスおよびガバナンス要件を満たすため、データの変更履歴を追跡する監査証跡です。
- 下流のテーブル、キャッシュ、または外部システムに変更を同期するデータレプリケーションワークロード
自動チェンジデータフィード
プレビュー
この機能は パブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
自動チェンジデータフィードは、Delta Lakeの行追跡とApache Iceberg v3の行リネージを使用して、書き込み時ではなくクエリ時に行レベルの変更を計算します。従来のチェンジデータフィードとは異なり、自動チェンジデータフィードは個別のテーブル設定を必要とせず、Delta LakeテーブルとApache Iceberg v3テーブルで動作します。
MERGE INTOおよびUPDATE操作で変更が書き込みごとにコンピュートされないため、自動チェンジデータフィードは、従来のチェンジデータフィードと比べて、書き込みパフォーマンスを向上させ、ストレージコストを削減します。
自動チェンジデータフィードは、レガシーチェンジデータフィードと同じtable_changes()およびreadChangeFeed APIsを使用し、バッチクエリ、構造化ストリーミング、Databricks-to-Databricks Delta Lake Sharingに対応しています。バッチクエリーで変更を読み取るとチェンジデータの増分処理をご覧ください。
要件
- Databricks Runtime 18以降
- Unity Catalog に登録されているサポート対象のテーブル形式:
- 行追跡が有効になった Delta Lake 形式、または Iceberg v3 形式のマネージドテーブル。
- 行追跡が有効な Delta Lake 形式の外部テーブル
Databricks Unity Catalog のテーブルの種類を参照してください。
チェンジデータフィードは Apache Iceberg の仕様の一部ではありません。Databricksリーダーは、Apache Iceberg v3テーブルの自動チェンジデータフィードをクエリできますが、外部のIcebergリーダーはクエリできません。Icebergテーブル仕様を参照してください。
Databricks リーダーのみが Delta Lake の自動チェンジデータフィードをクエリできます。
チェンジデータフィードの使用
チェンジデータフィードを使用するには、要件を満たしているテーブルを使用していることを確認してください。要件を参照してください。
チェンジデータフィードをバッチで読み取るには、次の手順を実行します。
- Python
- Scala
- SQL
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SELECT * FROM table_changes('<table_name>', 0)
チェンジデータフィードのバッチ読み取りに関する詳細については、バッチクエリでの変更の読み取りを参照してください。
チェンジデータフィードをストリームとして読み取るには、次の手順を実行します。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
チェンジデータフィードのストリーミング読み取りに関する詳細については、チェンジデータの増分処理を参照してください。
レガシーチェンジデータフィードからの移行
Delta Lakeテーブルをレガシーチェンジデータフィードから自動チェンジデータフィードに移行するには、次のようにします。
- テーブルが要件を満たしていることを確認してください。
- 従来のチェンジデータフィードをオフにするには、次のコマンドを実行します。
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
レガシーチェンジデータフィードと自動チェンジデータフィードは同時に使用できません。
チェンジデータフィード スキーマ
テーブルのチェンジデータフィードから読み取る場合、クエリーは最新のテーブルバージョンのスキーマを使用します。Databricks はほとんどのスキーマ変更および展開操作をサポートしていますが、列マッピングが有効なテーブルには制限があります。列マッピングのあるテーブルを参照してください。
Delta Lake テーブルのスキーマのデータ列に加えて、チェンジデータフィードには、変更イベントの種類を識別するメタデータ列が含まれています。
列名 | Type | 値 |
|---|---|---|
| String |
|
| Long | 含む:変更を含むDeltaログまたはテーブルのバージョン |
| タイムスタンプ | 含まれるもの:コミットが作成されたときに関連付けられたタイムスタンプ。 |
スキーマにこれらのメタデータ列と同じ名前の列が含まれている場合、テーブルでチェンジデータフィードを使用できません。チェンジデータフィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決します。
チェンジデータの増分処理
Databricksでは、テーブルからの変更を段階的に処理するために、構造化ストリーミングと組み合わせてチェンジデータフィードを使用することをお勧めします。テーブルのチェンジデータフィードのバージョンを自動的に追跡するには、Databricksの構造化ストリーミング を使用する必要があります。SCD タイプ 1 またはタイプ 2 のテーブルを使用した CDC 処理の詳細については、「AUTO CDC APIs: パイプラインによるチェンジデータキャプチャの簡素化」を参照してください。
ストリームが最初に開始されると、チェンジデータフィードは、テーブルの最新のスナップショットをINSERTレコードとして返し、その後、将来の変更をチェンジデータとして返します。チェンジデータフィードは、チェンジデータと新しいデータ行の両方をテーブルのトランザクションログに同時にコミットします。
ストリームでテーブルのチェンジデータフィードを読み取るように構成するには、オプションreadChangeFeedをtrueに次のように設定します。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
レート制限
Databricks は、変更データを読み取るときに、レート制限(maxFilesPerTrigger、maxBytesPerTrigger)と excludeRegex をサポートします。ストリーミング Delta Lake のオプションの完全な一覧については、「Delta Lake」を参照してください。
オプションで開始バージョンを指定できます。開始バージョンを指定を参照してください。開始スナップショット以外のバージョンでは、レート制限はコミット全体にアトミックに適用されます。現在のバッチは、コミット全体を含むか、次のバッチにコミットを繰り越すかのいずれかです。
テーブル履歴の再生
チェンジデータフィードは、テーブルのすべての変更の永続的な記録として使用されるものではありません。チェンジデータフィードが有効にされた後に発生する変更のみが記録されます。新しいストリーミング読み取りを開始して、現在のバージョンとそれ以降のすべての変更を取得することができます。
チェンジデータフィードのレコードは一時的であり、指定された保持期間のみアクセス可能です。トランザクションログは、テーブルバージョンとそれに対応するチェンジデータフィードバージョンを定期的に削除します。バージョンが削除されると、そのバージョンのチェンジデータフィードを読み取ることができなくなります。
永続的な履歴用に変更データをアーカイブする
ユースケースでテーブルへの変更履歴をすべて永続的に保持する必要がある場合は、増分ロジックを使用して、チェンジデータフィードから新しいテーブルにレコードを書き込みます。
次の例は、監査または完全な変更リプレイのために、利用可能なデータをバッチワークロードとして処理するためにtrigger.AvailableNowを使用する例です。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
開始バージョンを指定する
特定の時点から変更を読み取るには、タイムスタンプまたはバージョン番号のいずれかを使用して開始バージョンを指定してください。バッチ読み取りには開始バージョンが必要です。オプションで、範囲を制限するために終了バージョンを指定できます。テーブル履歴の詳細については、「タイムトラベルとは」を参照してください。
チェンジデータフィードを使用する構造化ストリーミングのワークロードを設定する場合、開始バージョンを指定すると、処理パフォーマンスに影響を与える可能性があります。
- 新しいデータ処理パイプラインは、ストリーム初回起動時にテーブル内のすべての既存レコードを
INSERT操作として記録するデフォルトの動作から、通常、恩恵を受けます。 - ターゲットテーブルに、ある時点までの適切な変更を含むすべてのレコードがすでに含まれている場合は、開始バージョンを指定して、
INSERTイベントとしてソーステーブルの状態が処理されることを避けてください。
次の例では、破損したチェックポイントを伴うストリーミング障害からの回復方法を示します。この例では、以下の条件を前提としています。
- テーブル作成時に、ソーステーブルでチェンジデータフィードが有効になりました。
- ターゲットの下流側のテーブルは、バージョン75まですべての変更を処理しました。
- ソーステーブルのバージョン履歴は、バージョン70以上で利用可能です。
既存のターゲットテーブルに書き込みストリームを定義する場合は、新しいチェックポイントの場所を指定する必要があります:
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
開始バージョンを指定し、そのバージョンがテーブル履歴に存在しない場合、ストリームは新しいチェックポイントから開始できません。マネージドテーブルは履歴バージョンを自動的にクリーンアップするため、指定されたすべての開始バージョンは最終的に削除されます。
テーブル履歴の再生を参照してください。
バッチクエリーの変更を読み取る
バッチクエリ構文を使用すると、特定のバージョン以降のすべての変更を読み取ったり、指定されたバージョンの範囲内で変更を読み取ったりできます。
- バージョンは整数として指定し、タイムスタンプは
yyyy-MM-dd[ HH:mm:ss[.SSS]]の形式の文字列として指定します。 - 開始バージョンと終了バージョンは含まれます。開始バージョンから最新バージョンを読み取るには、開始バージョンのみを指定します。
- チェンジデータフィードが有効になる前のバージョンを指定した場合、エラーが発生します。
開始および終了バージョンのオプションを指定してバッチ読み込みを使用するには、次を実行します。
- SQL
- Python
- Scala
バージョン0から10まで読み取るには、次のようにします:
SELECT * FROM table_changes('tableName', 0, 10)
2つのタイムスタンプバージョン間で読み取るには、次のようにします。
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
開始バージョンから最新バージョンまで読み取るには、以下を行ってください。
SELECT * FROM table_changes('tableName', 0)
名前に特殊文字を含むテーブルの変更を読み取るには、以下を実行します。
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
table_changesテーブル値関数を参照してください。
バージョン0から10まで読み取るには、次のようにします:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
2 つのタイムスタンプの間を読み取るには、次の操作を行います:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
開始バージョンから最新バージョンまで読み取るには、以下を行ってください。
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
バージョン0から10まで読み取るには、次のようにします:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
2 つのタイムスタンプの間を読み取るには、次の操作を行います:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
開始バージョンから最新バージョンまで読み取るには、以下を行ってください。
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
範囲外バージョンの処理
デフォルトでは、ユーザーが最後のコミットを超えるバージョンまたはタイムスタンプを指定した場合、エラーtimestampGreaterThanLatestCommitがスローされます。
Databricks Runtime 11.3 LTS以降では、次のように範囲外バージョンの許容を有効にできます。
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
この設定を有効にすると、クエリーは以下の異なる結果を返します。
- 最後のコミットより新しい開始バージョンまたはタイムスタンプは、空の結果を返します。
- 最終コミットを超える終了バージョンまたはタイムスタンプは、開始から最終コミットまでのすべての変更を返します。
Delta Lakeのレガシーチェンジデータフィード
従来のチェンジデータフィードは、個々のDelta Lakeテーブルに対して手動での設定が必要です。チェンジデータフィードがApache Icebergの仕様に含まれていないため、Apache Icebergテーブルはサポートされていません。Databricks は、自動チェンジデータフィードへの移行をお勧めします。レガシーチェンジデータフィードからの移行を参照してください。
レガシーチェンジデータフィードが有効になっている場合、ランタイムはテーブルに書き込まれたすべてのデータの変更イベントを記録します。これには、行データと、指定した行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。
レガシーチェンジデータフィードは、自動チェンジデータフィードと同じreadChangeFeedおよびtable_changes()読み取りAPIsを使用します。チェンジデータの増分処理およびバッチクエリでの変更の読み取りを参照してください。
従来のチェンジデータフィードを有効にしてください
個々のテーブルでレガシチェンジデータフィードを明示的に有効にする必要があります。次のいずれかの方法を使用してください:
新規テーブル
CREATE TABLEコマンドでテーブルプロパティdelta.enableChangeDataFeed = trueを設定します。
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
レガシーチェンジデータフィードを任意の期間オフにして再度オンにした場合、その期間はクエリできなくなります。自動チェンジデータフィードを使用して、期間中の変更をクエリしてください。自動チェンジデータフィードを参照してください。
既存のテーブル
ALTER TABLEコマンドでテーブルプロパティdelta.enableChangeDataFeed = trueを設定します。
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
ストレージの考慮事項
マネージドテーブルはデータ変更を効率的に記録し、ストレージレイアウトを最適化するために他の機能を使用する場合があります。
従来のチェンジデータフィードでは、次のストレージの動作を考慮する必要があります。
- 変更が別々のファイルに記録される可能性があるため、ストレージコストがわずかに増加する可能性があります。
- 挿入のみの操作やパーティション全体の削除など、一部の操作では変更データ・ファイルは生成されません。Databricksはトランザクションログから直接チェンジデータフィードを計算します。
- 変更データファイルは、テーブルの保持ポリシーに従います。
VACUUMコマンドは変更データファイルを削除し、トランザクションログからの変更はチェックポイントの保持ポリシーを使用します。
Databricksは、変更データファイルを直接クエリしてチェンジデータフィードを再構築しないよう推奨しています。常に Delta Lake と Apache Iceberg APIs を使用してください。
制限事項:
チェンジデータフィードの以下の制限事項:
列マッピングのあるテーブル
Delta Lakeテーブルで列マッピングを有効にすると、データファイルを書き換えることなく、列を削除または名前変更できます。Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。
ただし、非加法スキーマ変更の後にはチェンジデータフィードに制限があります。非加法スキーマ変更には、以下の操作が含まれます。
- 列の名前変更または削除
- 列のデータ型を変更してください。
- 列のNULL値の許容を、
ALTER COLUMN ... SET NOT NULLなどで変更します。Databricks でNOT NULL制約を設定するを参照してください。
非加法スキーマ変更が発生したトランザクション、または範囲のチェンジデータフィードを読み取ることはできません。
特定のバッチ読み取り範囲の前または後に非加算スキーマ変更を許可するため、クエリは、最新のテーブルバージョンではなく、範囲の終了バージョンのスキーマを使用します。バージョン範囲が非加法スキーマ変更にまたがっている場合、クエリーは失敗します。
自動チェンジデータフィード
- Apache Iceberg の仕様でチェンジデータフィードがサポートされていないため、外部の Iceberg クライアントは自動チェンジデータフィードをクエリできません。Icebergテーブル仕様を参照してください。
- マルチステートメントトランザクションの場合、トランザクション中にソーステーブルが変更された場合、自動チェンジデータフィードはサポートされていません。
- 自動チェンジデータフィードは、行フィルターまたは列マスクが適用されたテーブルではサポートされていません。「行フィルターと列マスク」を参照してください。
- チェンジデータフィードクエリは、列の名前変更、削除、データ型の変更などの非追加的なスキーマ変更が発生したテーブルバージョンをまたいで実行することはできません。クエリをスキーマ変更の前後の範囲に分割します。