AUTO CDC INTO (LakeFlow Declarative pipeline)
Use a instrução AUTO CDC ... INTO
para criar um fluxo que use a funcionalidade LakeFlow Declarative pipeline capture de dados de alterações (CDC) (CDC). Essa declaração lê as alterações de uma fonte CDC e as aplica a um destino de transmissão.
- Para saber mais CDC sobre, consulte O que é captura de dados de alterações (CDC)CDC()?
- Para obter mais detalhes sobre o uso do site
AUTO CDC
, consulte AUTO CDC APIs: Simplifique a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow. - Para obter mais detalhes sobre
CREATE FLOW
, consulte CREATE FLOW (LakeFlow Declarative pipeline).
Sintaxe
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
O senhor define restrições de qualidade de dados para o destino usando a mesma cláusula CONSTRAINT
que outras consultas do pipeline declarativo LakeFlow. Veja como gerenciar a qualidade dos dados com pipeline expectativas.
O comportamento padrão para eventos de INSERT
e UPDATE
é realizar upsert de eventos do CDC a partir da fonte: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino.O tratamento de DELETE
eventos pode ser especificado com a APPLY AS DELETE WHEN
condição.
O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Para tabelas SCD tipo 2, ao especificar o esquema da tabela de destino, o senhor também deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados que o campo sequence_by
.
Parâmetros
-
flow_name
O nome do fluxo a ser criado.
-
source
A fonte dos dados. A fonte deve ser uma fonte de transmissão . Use a palavra-chave transmissão para usar a semântica de transmissão para ler a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente anexadas. Para ingerir dados com commit de alteração, o senhor pode usar Python e a opção
SkipChangeCommits
para lidar com erros.Para obter mais informações sobre transmissão de dados, consulte transformação de dados com pipeline.
-
KEYS
A coluna ou combinação de colunas que identifica de forma exclusiva uma linha nos dados de origem. Os valores nessas colunas são usados para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.
Para definir uma combinação de colunas, use uma lista de colunas separadas por vírgula.
Essa cláusula é obrigatória.
-
IGNORE NULL UPDATES
Permite a ingestão de atualizações que contêm um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com um valor
null
manterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valornull
.Esta cláusula é opcional.
O default é para substituir as colunas existentes pelos valores do
null
. -
APPLY AS DELETE WHEN
Especifica quando um evento CDC deve ser tratado como um
DELETE
em vez de um upsert.Para fontes SCD tipo 2, para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma lápide na tabela Delta subjacente, e uma view é criada no metastore para filtrar essas lápides. O intervalo de retenção pode ser configurado com a propriedade de tabela
pipelines.cdc.tombstoneGCThresholdInSeconds
.Esta cláusula é opcional.
-
APPLY AS TRUNCATE WHEN
Especifica quando um evento de CDC deve ser tratado como uma tabela completa
TRUNCATE
. Como esta cláusula aciona uma operação completa de truncamento da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.A cláusula
APPLY AS TRUNCATE WHEN
é compatível apenas com o SCD tipo 1. O SCD tipo 2 não é compatível com as operações de truncagem.Esta cláusula é opcional.
-
SEQUENCE BY
O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. LakeFlow O pipeline declarativo usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.
Se várias colunas forem necessárias para o sequenciamento, use uma expressão
STRUCT
: ela será ordenada primeiro pelo primeiro campo de estrutura, depois pelo segundo campo se houver empate e assim por diante.As colunas especificadas devem ser tipos de dados classificáveis.
Essa cláusula é obrigatória.
-
COLUMNS
Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Você também pode:
- Especifique a lista completa de colunas a serem incluídas:
COLUMNS (userId, name, city)
. - Especifique uma lista de colunas a serem excluídas:
COLUMNS * EXCEPT (operation, sequenceNum)
Esta cláusula é opcional.
O site default deve incluir todas as colunas da tabela de destino quando a cláusula
COLUMNS
não for especificada. - Especifique a lista completa de colunas a serem incluídas:
-
STORED AS
Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2.
Esta cláusula é opcional.
O padrão é SCD tipo 1.
-
TRACK HISTORY ON
Especifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alguma alteração nas colunas especificadas. Você também pode:
- Especifique a lista completa de colunas a serem rastreadas:
COLUMNS (userId, name, city)
. - Especifique uma lista de colunas a serem excluídas do acompanhamento:
COLUMNS * EXCEPT (operation, sequenceNum)
Essa cláusula é opcional. O default é para rastrear a história de todas as colunas de saída quando houver alguma alteração, equivalente a
TRACK HISTORY ON *
. - Especifique a lista completa de colunas a serem rastreadas:
Exemplos
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);