CREATE FLOW (LakeFlow Declarative pipeline)
Use a instrução CREATE FLOW
para criar fluxos ou backfills para suas tabelas de pipeline declarativo LakeFlow.
Sintaxe
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parâmetros
-
nome_do_fluxo
O nome do fluxo a ser criado.
-
comentário
Uma descrição opcional para o fluxo.
-
Uma declaração
AUTO CDC ... INTO
que define o fluxo, comcreate_auto_cdc_flow_spec
. Você deve incluir uma declaraçãoAUTO CDC ... INTO
ou uma declaraçãoINSERT INTO
. UseAUTO CDC ... INTO
quando a consulta de origem usar a semântica de alteração dos dados.Para obter mais informações, consulte AUTO CDC INTO (LakeFlow Declarative pipeline).
-
tabela_alvo
A tabela a ser atualizada. Essa deve ser uma tabela de transmissão.
-
INSERT INTO
Define uma consulta de tabela que é inserida na tabela de destino. Se a opção
ONCE
não for fornecida, a consulta deverá ser uma consulta de transmissão . Use a palavra-chave transmissão para usar a semântica de transmissão para ler a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente anexadas. Para ingerir dados com commit de alteração, o senhor pode usar Python e a opçãoSkipChangeCommits
para lidar com erros.INSERT INTO
é mutuamente exclusivo comAUTO CDC ... INTO
. UseAUTO CDC ... INTO
quando os dados de origem incluírem a funcionalidade de captura de dados de alterações (CDC) (CDC). UseINSERT INTO
quando a fonte não o fizer.Para obter mais informações sobre transmissão de dados, consulte transformação de dados com pipeline.
-
UMA VEZ
Opcionalmente, defina o fluxo como um fluxo único, como um preenchimento. Usar
ONCE
altera o fluxo de duas maneiras:- A fonte
query
oucreate_auto_cdc_flow_spec
não é uma tabela de transmissão. - O fluxo é executado uma vez pelo site default. Se o pipeline for atualizado com um refresh completo, o fluxo
ONCE
será executado novamente para recriar os dados.
- A fonte
Exemplos
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;