Pular para o conteúdo principal

AUTO CDC INTO (pipeline declarativo LakeFlow )

Use a instrução AUTO CDC ... INTO para criar um fluxo que usa a funcionalidade de captura de dados de alterações (CDC) do pipeline declarativo LakeFlow . Esta declaração lê alterações de uma fonte CDC e as aplica a um destino de transmissão.

Sintaxe

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Você define restrições de qualidade de dados para o destino usando a mesma cláusula CONSTRAINT de outras consultas de pipeline declarativas LakeFlow . Veja gerenciar a qualidade dos dados com expectativas pipeline.

O comportamento padrão para eventos de INSERT e UPDATE é realizar upsert de eventos do CDC a partir da fonte: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino.O tratamento de DELETE eventos pode ser especificado com a APPLY AS DELETE WHEN condição.

important

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Para tabelas SCD tipo 2, ao especificar o esquema da tabela de destino, você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados do campo sequence_by .

Consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline declarativo LakeFlow.

Parâmetros

  • flow_name

    O nome do fluxo a ser criado.

  • source

    A fonte dos dados. A fonte deve ser uma fonte de transmissão . Use a palavra-chave transmissão para usar a semântica de transmissão para ler a fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos. Para ingerir dados que tenham confirmação de alteração, você pode usar Python e a opção SkipChangeCommits para lidar com erros.

    Para mais informações sobre transmissão de dados, veja transformação de dados com pipeline.

  • KEYS

    A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Os valores nessas colunas são usados para identificar quais eventos do CDC se aplicam a registros específicos na tabela de destino.

    Para definir uma combinação de colunas, use uma lista de colunas separadas por vírgulas.

    Esta cláusula é obrigatória.

  • IGNORE NULL UPDATES

    Permite ingerir atualizações contendo um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com um valor null manterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor null .

    Esta cláusula é opcional.

    O default é substituir as colunas existentes por valores null .

  • APPLY AS DELETE WHEN

    Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert.

    Para fontes SCD tipo 2, para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma marca de exclusão na tabela Delta subjacente, e uma view é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds .

    Esta cláusula é opcional.

  • APPLY AS TRUNCATE WHEN

    Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Como esta cláusula aciona uma operação completa de truncamento da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

    A cláusula APPLY AS TRUNCATE WHEN é suportada apenas para SCD tipo 1. SCD tipo 2 não suporta as operações truncate.

    Esta cláusula é opcional.

  • SEQUENCE BY

    O nome da coluna que especifica a ordem lógica dos eventos do CDC nos dados de origem. O pipeline declarativo LakeFlow usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.

    Se várias colunas forem necessárias para sequenciamento, use uma expressão STRUCT : ela ordenará primeiro pelo primeiro campo struct, depois pelo segundo campo se houver empate, e assim por diante.

    As colunas especificadas devem ser tipos de dados classificáveis.

    Esta cláusula é obrigatória.

  • COLUMNS

    Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Você pode:

    • Especifique a lista completa de colunas a serem incluídas: COLUMNS (userId, name, city).
    • Especifique uma lista de colunas a serem excluídas: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula é opcional.

    O default é incluir todas as colunas na tabela de destino quando a cláusula COLUMNS não é especificada.

  • STORED AS

    Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2.

    Esta cláusula é opcional.

    O padrão é SCD tipo 1.

  • TRACK HISTORY ON

    Especifica um subconjunto de colunas de saída para gerar registros históricos quando houver alguma alteração nessas colunas especificadas. Você pode:

    • Especifique a lista completa de colunas a serem rastreadas: COLUMNS (userId, name, city).
    • Especifique uma lista de colunas a serem excluídas do acompanhamento: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula é opcional. O default é rastrear o histórico de todas as colunas de saída quando houver alguma alteração, equivalente a TRACK HISTORY ON *.

Exemplos

SQL
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);