AUTO CDC API: LakeFlow 宣言型 パイプラインでチェンジデータキャプチャをシンプルに
LakeFlow宣言型パイプラインは、AUTO CDC
とAUTO CDC FROM SNAPSHOT
API を使用してチェンジデータキャプチャ (CDC) を簡素化します。
AUTO CDC
API は以前は APPLY CHANGES
と呼ばれ、構文も同じでした。
使用するインタフェースは、変更データのソースによって異なります。
AUTO CDC
を使用して、チェンジデータフィード (CDF) からの変更を処理します。AUTO CDC FROM SNAPSHOT
(パブリック プレビュー、Python でのみ使用可能) を使用して、データベース スナップショットの変更を処理します。
以前は、 MERGE INTO
ステートメントは、Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO
では、レコードの順序が正しくないために正しくない結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。
この AUTO CDC
API は、 LakeFlow 宣言型 パイプラインの SQL インターフェイスと Python インターフェイスでサポートされています。 この AUTO CDC FROM SNAPSHOT
API は、 LakeFlow 宣言型 パイプライン Python インターフェイスでサポートされています。
AUTO CDC
と AUTO CDC FROM SNAPSHOT
はどちらも、SCD タイプ 1 およびタイプ 2 を使用したテーブルの更新をサポートしています。
- レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
- SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。
構文およびその他のリファレンスについては、Lakeflow宣言型パイプラインSQLのAUTO CDC、Lakeflow宣言型パイプラインPython の AUTO CDC、およびLakeflow宣言型パイプラインPythonのAUTO CDC FROM SNAPSHOT を参照してください。
この記事では、ソース データの変更に基づいて LakeFlow 宣言型パイプラインのテーブルを更新する方法について説明します。 Deltaテーブルの行レベルの変更情報を記録してクエリする方法については、「DatabricksでのDelta Lake チェンジデータフィードの使用 」を参照してください。
必要条件
CDC APIを使用するには、 サーバレスLakeflow 宣言型 パイプライン または LakeFlow 宣言型 パイプライン Pro
または Advanced
エディションを使用するようにパイプラインを構成する必要があります。
CDC は AUTO CDC API でどのように実装されますか?
Lakeflow宣言型パイプラインの AUTO CDC APIは、順序がずれたレコードを自動的に処理することで、CDCレコードの正しい処理を保証し、順序が間違っているレコードを処理するための複雑なロジックを開発する必要がなくなります。ソース データには、レコードを順序付けする列を指定する必要があります。これにより、宣言型パイプライン Lakeflow ソース データの適切な順序付けが単調に増加する表現として解釈されます。 Lakeflow 宣言型パイプラインは、順不同で到着したデータを自動的に処理します。 SCDタイプ 2 の変更においては、Lakeflow宣言型パイプラインは適切な順序付け値をターゲットテーブルの __START_AT
列と __END_AT
列に伝達します。 各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。
AUTO CDC
を使用して CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に SQL の AUTO CDC ... INTO
ステートメントまたは Python の create_auto_cdc_flow()
関数を使用して、変更フィードのソース、キー、および順序を指定します。ターゲット ストリーミングテーブルを作成するには、 SQL の CREATE OR REFRESH STREAMING TABLE
ステートメントまたは Pythonの create_streaming_table()
関数を使用します。 SCD Type 1 および Type 2 の処理例を参照してください。
構文の詳細については、LakeFlow 宣言型 パイプライン SQL リファレンスまたは Python リファレンスを参照してください。
CDC は AUTO CDC FROM SNAPSHOT
API を使用してどのように実装されますか?
プレビュー
AUTO CDC FROM SNAPSHOT
API はパブリック プレビュー段階です。
AUTO CDC FROM SNAPSHOT
は、一連の順番のスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。AUTO CDC FROM SNAPSHOT
は、 LakeFlow 宣言型パイプライン Python インターフェイスでのみサポートされています。
AUTO CDC FROM SNAPSHOT
複数のソースタイプからのスナップショットの取り込みをサポートします。
- 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットをインジェストします。
AUTO CDC FROM SNAPSHOT
には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするためのシンプルで合理化されたインターフェイスがあります。パイプラインが更新されるたびに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。パイプラインを連続モードで実行すると、AUTO CDC FROM SNAPSHOT
処理を含むフローのトリガー間隔設定によって決定された期間に、パイプラインが更新されるたびに複数のスナップショットが取り込まれます。 - 履歴スナップショットの取り込みを使用して、Oracle データベースや MySQL データベース、データウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。
AUTO CDC FROM SNAPSHOT
を使用して任意のソースタイプから CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に Python の create_auto_cdc_from_snapshot_flow()
関数を使用して、処理の実装に必要なスナップショット、キー、およびその他の引数を指定します。定期的なスナップショットの取り込みと履歴スナップショットの取り込みの例を参照してください。
API に渡されるスナップショットは、バージョンごとに昇順である必要があります。LakeFlow宣言型パイプライン が順不同のスナップショットを検出すると、エラーがスローされます。
構文の詳細については、LakeFlow 宣言型 パイプライン Python リファレンスを参照してください。
シーケンシングに複数のカラムを使用
複数の列 (たとえば、タイムスタンプと同順位を破るための ID) で順序付けしたり、STRUCT を使用してそれらを組み合わせたりすることができます: STRUCT の最初のフィールドを最初に順序付け、同順位の場合は 2 番目のフィールドを考慮します。
SQLの例:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Pythonの例:
sequence_by = struct("timestamp_col", "id_col")
制限
シーケンス処理に使用する列は、ソート可能なデータ型である必要があります。
例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理
次のセクションでは LakeFlow チェンジデータフィードからのソース イベントに基づいてターゲット テーブルを更新する 宣言型 パイプライン SCD タイプ 1 およびタイプ 2 クエリの例を示します。
- 新しいユーザー・レコードを作成します。
- ユーザー・レコードを削除します。
- ユーザーレコードを更新します。 SCD タイプ 1 の例では、最後の
UPDATE
操作が遅れて到着し、ターゲットテーブルから削除され、順不同のイベントの処理を示しています。
次の例は、宣言型パイプラインの構成と更新に精通していることを前提としています LakeFlow 。 チュートリアル: LakeFlow 宣言型 パイプラインのチェンジデータキャプチャを用いたETL パイプラインの構築を参照してください。
これらの例を実行するには、まずサンプル データセットを作成する必要があります。 テスト データの生成を参照してください。
以下はこれらの例の入力記録です:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
次のすべての例には、 DELETE
操作と TRUNCATE
操作の両方を指定するオプションが含まれていますが、それぞれ省略可能です。
SCD タイプ 1 更新の処理
次の例は、SCD タイプ 1 更新の処理を示しています。
- Python
- SQL
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
追加のTRUNCATE
レコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3
でのTRUNCATE
操作によりレコード124
と126
が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
SCD タイプ 2 更新の処理
次の例は、SCD タイプ 2 更新の処理を示しています。
- Python
- SQL
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
SCD タイプ 2 クエリでは、ターゲット・テーブル内のヒストリーについて追跡する出力カラムのサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、 city
列をトラッキングから除外する方法を示しています。
次の例は、SCDタイプ2でトラックヒストリーを使用する例です:
- Python
- SQL
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
この例を追加の TRUNCATE
レコードなしで実行すると、ターゲットテーブルには次のレコードが含まれます。
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
テストデータの生成
次のコードは、このチュートリアルで示すクエリ例で使用するデータセットの例を生成するために提供されています。新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報があると仮定すると、これらのステートメントはノートブックまたは Databricks SQL で実行できます。次のコードは、LakeFlow宣言型パイプラインの一部として実行するための ものではありません 。
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
例: 定期的なスナップショット処理
次の例は、 mycatalog.myschema.mytable
に格納されたテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。 処理の結果は、 target
という名前のテーブルに書き込まれます。
mycatalog.myschema.mytable
タイムスタンプ 2024-01-01 00:00:00 のレコード
Key | Value |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
タイムスタンプ 2024-01-01 12:00:00 のレコード
Key | Value |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。
Key | Value | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
例: 履歴スナップショット処理
次の例は、クラウドストレージシステムに保存されている 2 つのスナップショットのソースイベントに基づいてターゲットテーブルを更新する SCD タイプ 2 処理を示しています。
timestamp
のスナップショット、に格納 /<PATH>/filename1.csv
Key | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
timestamp + 5
のスナップショット、に格納 /<PATH>/filename2.csv
Key | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | B3の |
4 | a4 | b4_new |
次のコード例は、これらのスナップショットを使用した SCD タイプ 2 更新の処理を示しています。
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。
Key | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | B3の | 2 | null |
4 | a4 | b4_new | 1 | null |
ターゲットストリーミングテーブル内のデータを追加、変更、または削除する
パイプラインがテーブルを Unity Catalogにパブリッシュする場合は、 データ操作言語 (DML) ステートメント (insert、update、delete、merge ステートメントなど) を使用して、AUTO CDC ... INTO
ステートメントによって作成されたターゲット ストリーミングテーブルを変更できます。
- ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
- ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以上を使用する共有Unity Catalog クラスターまたはSQL ウェアハウスでのみ実行できます。
- ストリーミングには追加専用のデータソースが必要なため、変更を伴うソースストリーミングテーブルからのストリーミングが処理に必要な場合 (DML ステートメントなど)、ソースストリーミングテーブルを読み取るときに skipChangeCommits フラグ を設定します。
skipChangeCommits
を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミングテーブルが必要ない場合は、マテリアライズドビュー (追加専用の制限がない) をターゲットテーブルとして使用できます。
LakeFlow宣言型パイプラインは指定された SEQUENCE BY
列を使用し、ターゲットテーブルの __START_AT
列と __END_AT
列 ( SCD タイプ 2) に適切な順序付け値を伝達するため、レコードの適切な順序を維持するためには、DML ステートメントでこれらの列に有効な値を使用する必要があります。 AUTO CDC API を使用した CDC の実装方法を参照してください。
ストリーミングテーブルでの DML ステートメントの使用の詳細については、 ストリーミングテーブルのデータを追加、変更、または削除するを参照してください。
次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
AUTO CDC 対象テーブルからのチェンジデータフィードの読み込み
Databricks Runtime 15.2 以降では、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で、AUTO CDC
クエリまたは AUTO CDC FROM SNAPSHOT
クエリのターゲットであるストリーミングテーブルからチェンジデータフィードを読み取ることができます。ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、次のものが必要です。
- ターゲット ストリーミングテーブルは Unity Catalogにパブリッシュする必要があります。 「LakeFlow宣言型 パイプラインでの Unity Catalogの使用 」を参照してください。
- ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、 Databricks Runtime 15.2以降を使用する必要があります。 チェンジデータフィードを別のパイプラインで読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。
LakeFlow宣言型 パイプラインで作成されたターゲット ストリーミングテーブルからチェンジデータフィードを読み取るには、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で読み取ります。 Deltaチェンジデータフィード機能の使用(Python やSQL の例など)について詳しくは、DatabricksでのDelta Lake チェンジデータフィードの使用 を参照してください。
チェンジデータフィードレコードには、変更イベントのタイプを識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには、通常、update_preimage
イベントと update_postimage
イベントに設定された_change_type
値が含まれます。
ただし、プライマリ・キー値の変更を含む更新がターゲット・ストリーミングテーブルに対して行われた場合、 _change_type
値は異なります。 変更にプライマリ・キーの更新が含まれる場合、 _change_type
メタデータ・フィールドは insert
イベントと delete
イベントに設定されます。 プライマリ・キーの変更は、 UPDATE
ステートメントまたは MERGE
ステートメントを使用してキー・フィールドの 1 つが手動で更新された場合、または SCD タイプ 2 テーブルの場合、 __start_at
・フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。
AUTO CDC
クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なるプライマリ・キー値を判別します。
- SCDタイプ 1 処理と LakeFlow 宣言型 パイプライン Python インターフェースの場合、主キーは
create_auto_cdc_flow()
関数のkeys
パラメーターの値です。LakeFlow 宣言型パイプライン SQL インターフェイスの場合、主キーはAUTO CDC ... INTO
ステートメントのKEYS
句で定義された列です。 - SCDタイプ 2 の場合、プライマリ・キーは
keys
パラメーターまたはKEYS
句にcoalesce(__START_AT, __END_AT)
操作からの戻り値を加えたものです。ここで、__START_AT
と__END_AT
はターゲット ストリーミングテーブルの対応するカラムです。
LakeFlow 宣言型 パイプライン の CDC クエリによって処理されたレコードに関するデータを取得する
次のメトリクスは、 AUTO CDC
クエリによってのみキャプチャされ、 AUTO CDC FROM SNAPSHOT
クエリではキャプチャされません。
次のメトリクスは、 AUTO CDC
クエリによってキャプチャされます。
num_upserted_rows
: 更新中にデータセットにアップサートされた出力行の数。num_deleted_rows
: 更新中にデータセットから削除された既存の出力行の数。
CDC 以外のフローの出力である num_output_rows
メトリクスは、 AUTO CDC
クエリではキャプチャされません。
LakeFlow宣言型パイプラインのCDC処理にはどのようなデータオブジェクトが使用されますか?
- これらのデータ構造は、
AUTO CDC
処理にのみ適用され、AUTO CDC FROM SNAPSHOT
処理には適用されません。 - これらのデータ構造は、ターゲット表が Hive metastoreにパブリッシュされる場合にのみ適用されます。 パイプラインが Unity Catalog に発行される場合、ユーザーは内部バッキング テーブルにアクセスできません。
Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。
- ターゲットテーブルに割り当てられた名前を使用するビュー。
- 宣言型パイプラインが 処理を管理するために使用する内部バッキング テーブルLakeFlowCDC 。このテーブルの名前は、ターゲット テーブル名の先頭に
__apply_changes_storage_
を付加して付けられます。
たとえば、 dlt_cdc_target
という名前のターゲット テーブルを宣言すると、メタストアに dlt_cdc_target
という名前のビューと __apply_changes_storage_dlt_cdc_target
という名前のテーブルが表示されます。ビューを作成すると LakeFlow 宣言型パイプラインで、順不同のデータを処理するために必要な追加情報 (廃棄標識やバージョンなど) を除外できます。 処理されたデータを表示するには、ターゲット ビューをクエリします。__apply_changes_storage_
テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、本番運用で使用するためにテーブルをクエリしないでください。テーブルにデータを手動で追加すると、バージョン列が欠落しているため、レコードは他の変更よりも前に来ると見なされます。