メインコンテンツまでスキップ

CREATE FLOW (Lakeflow 宣言型パイプライン)

CREATE FLOW ステートメントを使用して、Lakeflow 宣言型パイプライン テーブルのフローまたはバックフィルを作成します。

構文

CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}

問題

  • フロー名

    作成するフローの名前。

  • comment

    フローのオプションの説明。

  • AUTO CDC INTO

    create_auto_cdc_flow_specを使用してフローを定義するAUTO CDC ... INTOステートメント。AUTO CDC ... INTOステートメントまたはINSERT INTOステートメントのいずれかを含める必要があります。ソース クエリが変更データ セマンティクスを使用する場合は、 AUTO CDC ... INTO使用します。

    詳細については、「 AUTO CDC INTO (Lakeflow 宣言型パイプライン)」を参照してください。

  • ターゲットテーブル

    更新するテーブル。これはストリーミング テーブルである必要があります。

  • INSERT INTO

    ターゲット テーブルに挿入されるテーブル クエリを定義します。ONCEオプションが指定されていない場合、クエリは ストリーミング クエリである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取り中に既存のレコードの変更または削除が検出されると、エラーがスローされます。静的ソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットを含むデータを取り込むには、Python とSkipChangeCommitsオプションを使用してエラーを処理できます。

    INSERT INTO AUTO CDC ... INTOと相互に排他的です。ソース データにチェンジデータ キャプチャ ( CDC ) 機能が含まれる場合は、 AUTO CDC ... INTO使用します。 ソースが使用していない場合はINSERT INTOを使用します。

    ストリーミング データの詳細については、 「パイプラインを使用したデータの変換」を参照してください。

  • ONCE

    オプションで、フローをバックフィルなどの 1 回限りのフローとして定義します。ONCEを使用すると、フローは次の 2 つの方法で変化します。

    • ソースqueryまたはcreate_auto_cdc_flow_specはストリーミング テーブルではありません。
    • デフォルトでは、フローは 1 回実行されます。パイプラインが完全リフレッシュで更新されると、 ONCEフローが再度実行され、データが再作成されます。

SQL
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;