Delta Lakeで チェンジデータフィードを使用するDatabricks
チェンジデータフィードを使用すると、DatabricksはDeltaテーブルのバージョン間の行レベルの変更を追跡できます。Deltaテーブルで有効にすると、ランタイムはテーブルに書き込まれたすべてのデータの 変更イベント を記録します。これには、行データと、指定した行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。
チェンジデータフィードは、テーブル履歴と連携して動作し、変更情報を提供します。Deltaテーブルのクローンを作成すると別の履歴が作成されるため、クローンテーブルの変更データフィードは元のテーブルの変更データフィードと一致しません。
変更データを段階的に処理する
Databricks 、チェンジデータフィードを構造化ストリーミングと組み合わせて使用し、 Delta テーブルからの変更を段階的に処理することをお勧めします。 テーブルのチェンジデータフィードのバージョンを自動的に追跡するには、構造化ストリーミング for Databricks を使用する必要があります。
DLT は、変更データを簡単に伝達し、結果を SCD (緩やかに変化するディメンション) タイプ 1 またはタイプ 2 テーブルとして保存する機能を提供します。 「 APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。
テーブルからチェンジデータフィードを読み取るには、そのテーブルでチェンジデータフィードを有効にする必要があります。 「チェンジデータフィードの有効化」を参照してください。
次の構文例に示すように、テーブルに対してストリームを設定してチェンジデータフィードを読み取る場合は、オプション readChangeFeed
を true
に設定します。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
デフォルトでは、ストリームは、ストリームが最初に INSERT
として開始され、将来の変更が変更データとして開始されたときに、テーブルの最新のスナップショットを返します。
変更データは Delta Lake トランザクションの一部としてコミットされ、新しいデータがテーブルにコミットされるのと同時に使用可能になります。
オプションで開始バージョンを指定できます。 「開始バージョンを指定すべきですか?」を参照してください。
チェンジデータフィードはバッチ実行もサポートしていますが、これには開始バージョンを指定する必要があります。 「バッチクエリでの変更の読み取り」を参照してください。
レート制限 (maxFilesPerTrigger
、 maxBytesPerTrigger
) や excludeRegex
などのオプションも、変更データの読み取り時にサポートされます。
レート制限は、開始スナップショットバージョン以外のバージョンではアトミックにすることができます。つまり、コミットバージョン全体がレート制限されるか、コミット全体が返されます。
開始バージョンを指定すべきですか?
特定のバージョンより前に行われた変更を無視したい場合は、オプションで開始バージョンを指定できます。 タイムスタンプまたは Delta トランザクション ログに記録されたバージョン ID 番号を使用して、バージョンを指定できます。
バッチ読み取りには開始バージョンが必要であり、多くのバッチ パターンでは、オプションの終了バージョンを設定することでメリットが得られます。
チェンジデータフィードを含む構造化ストリーミングワークロードを設定する場合は、開始バージョンを指定することが処理にどのように影響するかを理解することが重要です。
多くのストリーミング ワークロード (特に新しいデータ処理パイプライン) は、デフォルトの動作の恩恵を受けます。 デフォルト動作では、ストリームが最初にテーブル内のすべての既存のレコードをチェンジデータフィードの INSERT
操作として記録するときに、最初のバッチが処理されます。
ターゲットテーブルに、特定の時点までに適切な変更が加えられたすべてのレコードがすでに含まれている場合は、ソーステーブルの状態が INSERT
イベントとして処理されないように、開始バージョンを指定します。
次の構文例は、チェックポイントが破損したストリーミング障害から回復します。 この例では、次の条件を想定します。
- チェンジデータフィードは、テーブルの作成時にソーステーブルで有効になっていました。
- ターゲットのダウンストリームテーブルは、バージョン75までのすべての変更を処理しました。
- ソース テーブルのバージョン履歴は、バージョン 70 以降で使用できます。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
この例では、新しいチェックポイントの場所も指定する必要があります。
開始バージョンを指定した場合、開始バージョンがテーブル履歴に存在しなくなった場合、ストリームは新しいチェックポイントから開始できません。 Delta Lake は履歴バージョンを自動的にクリーンアップするため、指定されたすべての開始バージョンは最終的に削除されます。
「チェンジデータフィードを使用してテーブルの全履歴を再生できますか?」を参照してください。
バッチ クエリでの変更の読み取り
バッチ クエリ構文を使用して、特定のバージョンから始まるすべての変更を読み取るか、指定したバージョン範囲内の変更を読み取ることができます。
バージョンは整数として指定し、タイムスタンプはyyyy-MM-dd[ HH:mm:ss[.SSS]]
の形式の文字列として指定します。
開始バージョンと終了バージョンは、クエリに含まれています。 特定の開始バージョンからテーブルの最新バージョンへの変更を読み取るには、開始バージョンのみを指定します。
変更イベントが記録されているバージョンよりも古いバージョンまたはタイムスタンプを指定した場合(つまり、チェンジデータフィードが有効になっていた場合)、チェンジデータフィードが有効になっていないことを示すエラーがスローされます。
次の構文例は、バッチ読み取りで開始バージョン・オプションと終了バージョン・オプションを使用する方法を示しています。
- SQL
- Python
- Scala
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
デフォルトでは、ユーザーがテーブルの最後のコミットを超えるバージョンまたはタイムスタンプを渡すと、エラーtimestampGreaterThanLatestCommit
がスローされます。Databricks Runtime 11.3 LTS以降では、ユーザーが次の構成をtrue
に設定した場合、チェンジデータフィードで範囲外バージョンのケースを処理できます:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
テーブルの最後のコミットよりも新しい開始バージョンを指定した場合、またはテーブルの最後のコミットよりも新しい開始タイムスタンプを指定した場合、前述の構成が有効になっていると、空の読み取り結果が返されます。
テーブルの最終コミットより大きい終了バージョン、またはテーブルの最終コミットより新しい終了タイムスタンプを指定した場合、バッチ読み取りモードで前述の設定を有効にすると、開始バージョンから最終コミットまでのすべての変更が返されます。
チェンジデータフィードのスキーマは何ですか?
テーブルのチェンジデータフィードから読み取ると、最新のテーブルバージョンのスキーマが使用されます。
ほとんどのスキーマ変更操作と進化操作は完全にサポートされています。 列マッピングが有効になっているテーブルは、すべてのユースケースをサポートしているわけではなく、動作が異なります。 「カラムマッピングが有効になっているテーブルのチェンジデータフィードの制限」を参照してください。
Deltaテーブルのスキーマのデータ列に加えて、チェンジデータフィードには、変更イベントの種類を識別するメタデータ列が含まれています:
列名 | タイプ | 値 |
---|---|---|
| 文字列 |
|
| Long | 変更を含むDeltaログまたはテーブルのバージョン。 |
| Timestamp | コミットが作成されたときに関連付けられたタイムスタンプ。 |
(1) preimage
は更新前の値、postimage
は更新後の値です。
追加された列と同じ名前の列がスキーマに含まれている場合、テーブルでチェンジデータフィードを有効にすることはできません。チェンジデータフィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決します。
チェンジデータフィードを有効にする
チェンジデータフィードを読み取れるのは、有効なテーブルのことだけです。 チェンジデータフィードオプションは、次のいずれかの方法を使用して明示的に有効にする必要があります。
-
新しいテーブル :
CREATE TABLE
コマンドでテーブルプロパティdelta.enableChangeDataFeed = true
を設定します。SQLCREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
-
既存のテーブル :
ALTER TABLE
コマンドでテーブルプロパティdelta.enableChangeDataFeed = true
を設定します。SQLALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
-
すべての新しいテーブル :
SQLset spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
チェンジデータフィードを有効にした後の変更のみが記録されます。 テーブルに対する過去の変更はキャプチャされません。
データストレージを変更
チェンジデータフィードを有効にすると、テーブルのストレージコストがわずかに増加します。 変更データ・レコードは、クエリの実行時に生成され、通常は書き換えられたファイルの合計サイズよりもはるかに小さくなります。
Databricks は、 UPDATE
、 DELETE
、および MERGE
操作の変更データを、テーブル ディレクトリの下の _change_data
フォルダーに記録します。 挿入のみの操作やパーティション全体の削除など、一部の操作では、トランザクション・ログから直接チェンジデータフィードを効率的にコンピュートできるため Databricks _change_data
ディレクトリにデータが生成されません。
_change_data
フォルダ内のデータ ファイルに対するすべての読み取りは、サポートされている Delta Lake APIsを経由する必要があります。
_change_data
フォルダ内のファイルは、テーブルの保持ポリシーに従います。チェンジデータフィードのデータは、 VACUUM
コマンドが実行されると削除されます。
チェンジデータフィードを使用して、テーブルの履歴全体を再生できますか?
チェンジデータフィードは、テーブルに対するすべての変更の永続的な記録として機能することを意図したものではありません。 チェンジデータフィードは、有効にした後に発生した変更のみを記録します。
チェンジデータフィードと Delta Lake を使用すると、ソース テーブルの完全なスナップショットを常に再構築できるため、チェンジデータフィードが有効になっているテーブルに対して新しいストリーミング読み取りを開始し、そのテーブルの現在のバージョンとその後に発生するすべての変更をキャプチャできます。
チェンジデータフィードのレコードは、一時的なものとして扱い、指定した保持期間のみアクセス可能にする必要があります。 Deltaトランザクションログは、テーブルバージョンとそれに対応するチェンジデータフィードバージョンを定期的に削除します。トランザクション・ログからバージョンを削除すると、そのバージョンのチェンジデータフィードは読み取れなくなります。
ユースケースでテーブルに対するすべての変更の永続的な履歴を保持する必要がある場合は、インクリメンタルロジックを使用してチェンジデータフィードから新しいテーブルにレコードを書き込む必要があります。 次のコード例は、構造化ストリーミングの増分処理を活用しながら、使用可能なデータをバッチワークロードとして処理する trigger.AvailableNow
の使用方法を示しています。 このワークロードをメインの処理パイプラインと非同期にスケジュールして、監査目的または完全なリプレイ性のためにチェンジデータフィードのバックアップを作成できます。
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
チェンジデータフィード Tables with column mapping enabled の制限事項
Deltaテーブルで列マッピングを有効にすると、既存のデータのデータファイルを書き換えることなく、テーブル内の列を削除または名前変更できます。列マッピングを有効にすると、列の名前変更や削除、データ型の変更、null値の許容の変更など、付加的でないスキーマの変更を実行した後、変更データフィードに制限されます。
- バッチセマンティクスを使用して、非加法スキーマ変更が発生したトランザクション、または範囲のチェンジデータフィードを読み取ることはできません。
- Databricks Runtime 12.2 LTS以下では、列マッピングが有効になっているテーブルで、非加法スキーマの変更が発生したものは、チェンジデータフィードでのストリーミング読み取りをサポートしていません。列マッピングとスキーマの変更によるストリーミングを参照してください。
- Databricks Runtime 11.3 LTS以下では、列マッピングが有効になっているテーブルで、列の名前変更や削除が発生したテーブルのチェンジデータフィードを読み取ることはできません。
Databricks Runtime 12.2 LTS 以降では、列マッピングが有効になっているテーブルで、非加法スキーマの変更が発生したテーブルのチェンジデータフィードでバッチ読み取りを実行できます。読み取り操作では、最新バージョンのテーブルのスキーマを使用する代わりに、クエリで指定されたテーブルの終了バージョンのスキーマを使用します。 指定されたバージョン範囲が非加法スキーマの変更にまたがっている場合でも、クエリは失敗します。