CREATE FLOW (LakeFlow 宣言型パイプライン)
CREATE FLOW
ステートメントを使用して、LakeFlow 宣言型パイプライン テーブルのフローまたはバックフィルを作成します。
構文
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
パラメーター
-
flow_name
作成するフローの名前。
-
comment
フローの説明 (オプション)。
-
フローを定義する
AUTO CDC ... INTO
ステートメント (create_auto_cdc_flow_spec
.AUTO CDC ... INTO
ステートメントまたはINSERT INTO
ステートメントを含める必要があります。ソース クエリで変更データ セマンティクスを使用する場合は、AUTO CDC ... INTO
を使用します。詳細については、「 AUTO CDC INTO (LakeFlow 宣言型パイプライン)」を参照してください。
-
target_table
更新するテーブル。これはストリーミングテーブルである必要があります。
-
INSERT INTO
ターゲット・テーブルに挿入されるテーブル・クエリを定義します。
ONCE
オプションが指定されていない場合、クエリは ストリーミング クエリである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python とSkipChangeCommits
オプションを使用してエラーを処理できます。INSERT INTO
はAUTO CDC ... INTO
と相互に排他的です。ソース データにチェンジデータキャプチャ (CDC) 機能が含まれている場合は、AUTO CDC ... INTO
を使用します。ソースがそうでない場合は、INSERT INTO
を使用します。ストリーミング データの詳細については、「 パイプラインを使用したデータの変換」を参照してください。
-
ONCE
必要に応じて、フローを 1 回限りのフロー (バックフィルなど) として定義します。
ONCE
を使用すると、フローが次の 2 つの方法で変更されます。- ソース
query
またはcreate_auto_cdc_flow_spec
はストリーミングテーブルではありません。 - フローはデフォルトで 1 回実行されます。パイプラインが完全に更新されて更新されると、
ONCE
フローが再度実行され、データが再作成されます。
- ソース
例
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;