criar_auto_cdc_from_snapshot_flow
Visualização
Essa funcionalidade está na Pré-visualização Pública.
A função create_auto_cdc_from_snapshot_flow
cria um fluxo que usa a funcionalidade LakeFlow Declarative pipeline capture de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot. Consulte Como o CDC é implementado com a API AUTO CDC FROM SNAPSHOT
?
Essa função substitui a função anterior apply_changes_from_snapshot()
. As duas funções têm a mesma assinatura. A Databricks recomenda a atualização para usar o novo nome.
O senhor deve ter uma tabela de transmissão de destino para essas operações.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table ().
Sintaxe
import dlt
dlt.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Para o processamento de AUTO CDC FROM SNAPSHOT
, o comportamento de default é inserir uma nova linha quando um registro correspondente com o mesmo key(s) não existir no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.
Para saber mais sobre o processamento do CDC com o Snapshot, consulte O AUTO CDC APIs: Simplifique a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow. Para obter exemplos de uso da função create_auto_cdc_from_snapshot_flow()
, consulte os exemplos de ingestão periódica de Snapshot e ingestão de Snapshot histórico.
Parâmetros
Parâmetro | Tipo | Descrição |
---|---|---|
|
| Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table () para criar a tabela de destino antes de executar a função |
|
| Obrigatório. O nome de uma tabela ou view para o Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão do Snapshot. Consulte Implementar o argumento |
|
| Obrigatório. A coluna ou combinação de colunas que identifica de forma exclusiva uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar uma das seguintes opções:
Argumentos para funções |
|
| Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como |
|
| Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use
Os argumentos das funções |
Implemente o argumento source
A função create_auto_cdc_from_snapshot_flow()
inclui o argumento source
. Para processar o Snapshot histórico, espera-se que o argumento source
seja uma função lambda Python que retorne dois valores para a função create_auto_cdc_from_snapshot_flow()
: um Python DataFrame contendo os dados do Snapshot a serem processados e uma versão do Snapshot.
A seguir está a assinatura da função lambda:
lambda Any => Optional[(DataFrame, Any)]
- O argumento para a função lambda é a versão do Snapshot processada mais recentemente.
- O valor de retorno da função lambda é
None
ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.
Um exemplo que implementa e chama a função lambda:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
O tempo de execução do pipeline declarativo LakeFlow executa as seguintes etapas sempre que o pipeline que contém a função create_auto_cdc_from_snapshot_flow()
é acionado:
- executar a função
next_snapshot_and_version
para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot. - Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
- Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.
- Retorna à etapa 1 para carregar o próximo Snapshot e sua versão.