AUTO CDC API: Lakeflow 宣言型 パイプラインでチェンジデータキャプチャをシンプルに
Lakeflow宣言型パイプラインは、AUTO CDC
とAUTO CDC FROM SNAPSHOT
API を使用してチェンジデータキャプチャ (CDC) を簡素化します。
AUTO CDC
APIs APPLY CHANGES
APIsに代わるもので、同じ構文を持ちます。 APPLY CHANGES
APIs引き続き使用できますが、 Databricksでは代わりにAUTO CDC
APIsを使用することをお勧めします。
使用するインタフェースは、変更データのソースによって異なります。
- データ変更フィード (CDF) からの変更を処理するには、
AUTO CDC
を使用します。 - データベース スナップショットの変更を処理するには、
AUTO CDC FROM SNAPSHOT
(パブリック プレビュー、Python でのみ使用可能) を使用します。
以前は、 MERGE INTO
ステートメントは Databricks 上の CDC レコードの処理によく使用されていました。ただし、 MERGE INTO
では、レコードの順序が正しくないために誤った結果が生成されたり、レコードの順序を変更するために複雑なロジックが必要になったりする可能性があります。
この AUTO CDC
API は、 Lakeflow 宣言型 パイプラインの SQL インターフェイスと Python インターフェイスでサポートされています。 この AUTO CDC FROM SNAPSHOT
API は、 Lakeflow 宣言型 パイプライン Python インターフェイスでサポートされています。
AUTO CDC
とAUTO CDC FROM SNAPSHOT
はどちらも、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新をサポートしています。
- レコードを直接更新するには、SCD タイプ 1 を使用します。更新されたレコードの履歴は保持されません。
- SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。
構文およびその他のリファレンスについては、Lakeflow宣言型パイプラインSQLのAUTO CDC、Lakeflow宣言型パイプラインPython の AUTO CDC、およびLakeflow宣言型パイプラインPythonのAUTO CDC FROM SNAPSHOT を参照してください。
この記事では、ソース データの変更に基づいて Lakeflow 宣言型パイプラインのテーブルを更新する方法について説明します。 Deltaテーブルの行レベルの変更情報を記録してクエリする方法については、「DatabricksでのDelta Lake チェンジデータフィードの使用 」を参照してください。
要件
CDC APIを使用するには、 サーバレスLakeflow 宣言型 パイプライン または Lakeflow 宣言型 パイプライン Pro
または Advanced
エディションを使用するようにパイプラインを構成する必要があります。
AUTO CDC API では CDC はどのように実装されますか?
Lakeflow宣言型パイプラインの AUTO CDC APIは、順序がずれたレコードを自動的に処理することで、CDCレコードの正しい処理を保証し、順序が間違っているレコードを処理するための複雑なロジックを開発する必要がなくなります。ソース データには、レコードを順序付けする列を指定する必要があります。これにより、宣言型パイプライン Lakeflow ソース データの適切な順序付けが単調に増加する表現として解釈されます。 Lakeflow 宣言型パイプラインは、順不同で到着したデータを自動的に処理します。 SCDタイプ 2 の変更においては、Lakeflow宣言型パイプラインは適切な順序付け値をターゲットテーブルの __START_AT
列と __END_AT
列に伝達します。 各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。
AUTO CDC
でCDC処理を実行するには、まずストリーミング テーブルを作成し、次にSQLのAUTO CDC ... INTO
ステートメントまたはPythonのcreate_auto_cdc_flow()
関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲットのストリーミング テーブルを作成するには、 SQLのCREATE OR REFRESH STREAMING TABLE
ステートメントまたはPythonのcreate_streaming_table()
関数を使用します。 SCD タイプ 1 およびタイプ 2 の処理例を参照してください。
構文の詳細については、Lakeflow 宣言型 パイプライン SQL リファレンスまたは Python リファレンスを参照してください。
AUTO CDC FROM SNAPSHOT
API では CDC はどのように実装されますか?
プレビュー
AUTO CDC FROM SNAPSHOT
API はパブリック プレビュー段階です。
AUTO CDC FROM SNAPSHOT
は、一連の順番のスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。AUTO CDC FROM SNAPSHOT
は、 Lakeflow 宣言型パイプライン Python インターフェイスでのみサポートされています。
AUTO CDC FROM SNAPSHOT
複数のソース タイプからのスナップショットの取り込みをサポートします。
- 定期的なスナップショットの取り込みを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。
AUTO CDC FROM SNAPSHOT
には、既存のデータベース オブジェクトからスナップショットを定期的に取り込むことをサポートするシンプルで合理化されたインターフェースがあります。パイプラインの更新ごとに新しいスナップショットが取り込まれ、取り込み時刻がスナップショットのバージョンとして使用されます。パイプラインが連続モードで実行されると、AUTO CDC FROM SNAPSHOT
処理を含むフローのトリガー間隔設定によって決定される期間で、パイプラインの更新ごとに複数のスナップショットが取り込まれます。 - 履歴スナップショットの取り込みを使用して、Oracle データベース、 MySQLデータベース、またはデータウェアハウスから生成されたスナップショットなどのデータベース スナップショットを含むファイルを処理します。
AUTO CDC FROM SNAPSHOT
を使用して任意のソース タイプからCDC処理を実行するには、まずストリーミング テーブルを作成し、次にPythonのcreate_auto_cdc_from_snapshot_flow()
関数を使用して、処理の実装に必要なスナップショット、キー、その他の引数を指定します。 定期的なスナップショットの取り込みと履歴スナップショットの取り込みの例を参照してください。
API に渡されるスナップショットは、バージョンごとに昇順である必要があります。Lakeflow宣言型パイプライン が順不同のスナップショットを検出すると、エラーがスローされます。
構文の詳細については、Lakeflow 宣言型 パイプライン Python リファレンスを参照してください。
シーケンスには複数の列を使用する
複数の列で順序付けることができ (たとえば、同点の場合はタイムスタンプと ID)、STRUCT を使用してそれらを組み合わせることができます。つまり、最初に STRUCT の最初のフィールドで順序付けし、同点の場合は 2 番目のフィールドを考慮します。
SQL の例:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python の例:
sequence_by = struct("timestamp_col", "id_col")
制限事項
シーケンスに使用する列は、並べ替え可能なデータ型である必要があります。
例: CDFソースデータを使用したSCDタイプ1およびSCDタイプ2の処理
次のセクションでは Lakeflow チェンジデータフィードからのソース イベントに基づいてターゲット テーブルを更新する 宣言型 パイプライン SCD タイプ 1 およびタイプ 2 クエリの例を示します。
- 新しいユーザー レコードを作成します。
- ユーザー レコードを削除します。
- ユーザーレコードを更新します。SCD タイプ 1 の例では、最後の
UPDATE
操作が遅れて到着し、ターゲット テーブルから削除され、順序どおりでないイベントの処理が示されています。
次の例は、宣言型パイプラインの構成と更新に精通していることを前提としています Lakeflow 。 チュートリアル: Lakeflow 宣言型 パイプラインのチェンジデータキャプチャを用いたETL パイプラインの構築を参照してください。
これらの例を実行するには、まずサンプル データセットを作成する必要があります。 テスト データの生成を参照してください。
以下はこれらの例の入力記録です:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
以下のすべての例には、 DELETE
とTRUNCATE
両方の操作を指定するオプションが含まれていますが、それぞれはオプションです。
SCDタイプ1の更新を処理する
次の例は、SCD タイプ 1 の更新の処理を示しています。
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
追加のTRUNCATE
レコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3
でのTRUNCATE
操作によりレコード124
と126
が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
SCDタイプ2の更新を処理する
次の例は、SCD タイプ 2 の更新の処理を示しています。
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
SCD タイプ 2 クエリでは、ターゲット テーブルの履歴を追跡する出力列のサブセットも指定できます。他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。次の例は、 city
列を追跡から除外する方法を示しています。
次の例は、SCDタイプ2でトラックヒストリーを使用する例です:
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
追加のTRUNCATE
レコードなしでこの例を実行すると、ターゲット テーブルには次のレコードが含まれます。
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
テストデータを生成する
次のコードは、このチュートリアルで示すクエリ例で使用するデータセットの例を生成するために提供されています。新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報があると仮定すると、これらのステートメントはノートブックまたは Databricks SQL で実行できます。次のコードは、Lakeflow宣言型パイプラインの一部として実行するための ものではありません 。
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
例: 定期的なスナップショット処理
次の例は、 mycatalog.myschema.mytable
に保存されているテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。処理の結果はtarget
という名前のテーブルに書き込まれます。
mycatalog.myschema.mytable
タイムスタンプ2024-01-01 00:00:00の記録
Key | Value |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
タイムスタンプ2024-01-01 12:00:00の記録
Key | Value |
---|---|
2 | b2 |
3 | a3 |
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。
Key | Value | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024年1月1日 00:00:00 | 2024年1月1日 12:00:00 |
2 | a2 | 2024年1月1日 00:00:00 | 2024年1月1日 12:00:00 |
2 | b2 | 2024年1月1日 12:00:00 | null |
3 | a3 | 2024年1月1日 12:00:00 | null |
例: 履歴スナップショット処理
次の例は、クラウド ストレージ システムに保存されている 2 つのスナップショットからのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 処理を示しています。
timestamp
のスナップショット、保存場所 /<PATH>/filename1.csv
Key | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
timestamp + 5
のスナップショット、保存場所 /<PATH>/filename2.csv
Key | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
次のコード例は、これらのスナップショットを使用して SCD タイプ 2 の更新を処理する方法を示しています。
from pyspark import pipelines as dp
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dp.create_streaming_live_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。
Key | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
ターゲットのストリーミングテーブルのデータを追加、変更、または削除します。
パイプラインがテーブルをUnity Catalogに公開する場合、insert、update、delete、merge ステートメントを含むデータ操作言語(DML) ステートメントを使用して、 AUTO CDC ... INTO
ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。
- ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
- ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以上を使用する共有Unity Catalog クラスターまたはSQL ウェアハウスでのみ実行できます。
- ストリーミングには追加専用のデータ ソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルの読み取り時にSkipChangeCommits フラグを設定します。
skipChangeCommits
が設定されている場合、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、ターゲット テーブルとしてマテリアライズドビュー (追加のみの制限がない) を使用できます。
Lakeflow宣言型パイプラインは指定された SEQUENCE BY
列を使用し、ターゲットテーブルの __START_AT
列と __END_AT
列 ( SCD タイプ 2) に適切な順序付け値を伝達するため、レコードの適切な順序を維持するためには、DML ステートメントでこれらの列に有効な値を使用する必要があります。 AUTO CDC API を使用した CDC の実装方法を参照してください。
ストリーミングテーブルでの DML ステートメントの使用の詳細については、 ストリーミングテーブルのデータを追加、変更、または削除するを参照してください。
次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
AUTO CDCターゲットテーブルから変更データフィードを読み取ります
Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データフィードを読み取るのと同じ方法で、 AUTO CDC
またはAUTO CDC FROM SNAPSHOT
ターゲットであるDeltaテーブルから変更データフィードを読み取ることができます。 ターゲットのストリーミング テーブルから変更データフィードを読み取るには、次のものが必要です。
- ターゲット ストリーミングテーブルは Unity Catalogにパブリッシュする必要があります。 「Lakeflow宣言型パイプラインでの Unity Catalogの使用 」を参照してください。
- ターゲット ストリーミング テーブルから変更データフィードを読み取るには、 Databricks Runtime 15.2 以降を使用する必要があります。 別のパイプラインで変更データフィードを読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。
Lakeflow宣言型 パイプラインで作成されたターゲット ストリーミングテーブルからチェンジデータフィードを読み取るには、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で読み取ります。 Deltaチェンジデータフィード機能の使用(Python やSQL の例など)について詳しくは、DatabricksでのDelta Lake チェンジデータフィードの使用 を参照してください。
変更データフィード レコードには、変更イベントのタイプを識別するメタデータが含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには通常、 update_preimage
およびupdate_postimage
イベントに設定された_change_type
値が含まれます。
ただし、主キー値の変更を含む更新がターゲット ストリーミング テーブルに行われた場合、 _change_type
値は異なります。 変更に主キーの更新が含まれる場合、 _change_type
メタデータ フィールドはinsert
およびdelete
イベントに設定されます。主キーの変更は、 UPDATE
またはMERGE
ステートメントを使用してキー フィールドの 1 つに手動で更新が行われたとき、または SCD タイプ 2 テーブルの場合は、 __start_at
フィールドが以前の開始シーケンス値を反映するように変更されたときに発生する可能性があります。
AUTO CDC
クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。
- SCDタイプ 1 処理と Lakeflow 宣言型 パイプライン Python インターフェースの場合、主キーは
create_auto_cdc_flow()
関数のkeys
パラメーターの値です。Lakeflow 宣言型パイプライン SQL インターフェイスの場合、主キーはAUTO CDC ... INTO
ステートメントのKEYS
句で定義された列です。 - SCDタイプ 2 の場合、主キーは、
keys
問題またはKEYS
句に、coalesce(__START_AT, __END_AT)
操作からの戻り値を加えたものです。ここで、__START_AT
と__END_AT
、ターゲット ストリーミング テーブルの対応する列です。
Lakeflow 宣言型 パイプライン の CDC クエリによって処理されたレコードに関するデータを取得する
次のメトリクスは、 AUTO CDC
クエリによってのみキャプチャされ、 AUTO CDC FROM SNAPSHOT
クエリによってはキャプチャされません。
次のメトリクスは、 AUTO CDC
クエリによってキャプチャされます。
num_upserted_rows
: 更新中にデータセットにアップサートされた出力行の数。num_deleted_rows
: 更新中にデータセットから削除された既存の出力行の数。
非 CDC フローの出力であるnum_output_rows
メトリクスは、 AUTO CDC
クエリではキャプチャされません。
Lakeflow宣言型パイプラインのCDC処理にはどのようなデータオブジェクトが使用されますか?
- これらのデータ構造は
AUTO CDC
処理にのみ適用され、AUTO CDC FROM SNAPSHOT
処理には適用されません。 - これらのデータ構造は、ターゲット テーブルがHive metastoreに公開される場合にのみ適用されます。 パイプラインが Unity Catalog に公開される場合、ユーザーは内部のバッキング テーブルにアクセスできません。
Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。
- ターゲットテーブルに割り当てられた名前を使用するビュー。
- 宣言型パイプラインが 処理を管理するために使用する内部バッキング テーブルLakeflowCDC 。このテーブルの名前は、ターゲット テーブル名の先頭に
__apply_changes_storage_
を付加して付けられます。
たとえば、 dp_cdc_target
という名前のターゲット テーブルを宣言すると、メタストアに dp_cdc_target
という名前のビューと __apply_changes_storage_dp_cdc_target
という名前のテーブルが表示されます。ビューを作成すると Lakeflow 宣言型パイプラインで、順不同のデータを処理するために必要な追加情報 (廃棄標識やバージョンなど) を除外できます。 処理されたデータを表示するには、ターゲット ビューをクエリします。__apply_changes_storage_
テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、本番運用で使用するためにテーブルをクエリしないでください。テーブルにデータを手動で追加すると、バージョン列が欠落しているため、レコードは他の変更よりも前に来ると見なされます。