criar_auto_cdc_a_partir_do_fluxo_de_instantâneo
Visualização
Esta funcionalidade está em Visualização Pública.
A função create_auto_cdc_from_snapshot_flow
cria um fluxo que usa a funcionalidade de captura de dados de alterações (CDC) do pipeline declarativo LakeFlow para processar dados de origem do instantâneo do banco de dados. Veja como o CDC é implementado com a API AUTO CDC FROM SNAPSHOT
?.
Esta função substitui a função anterior apply_changes_from_snapshot()
. As duas funções têm a mesma assinatura. A Databricks recomenda atualizar para usar o novo nome.
Você deve ter uma tabela de transmissão alvo para estas operações.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() .
Sintaxe
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Para o processamento AUTO CDC FROM SNAPSHOT
, o comportamento default é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) key(s) não existe(m) no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. Linhas com chave presente no destino, mas não mais presente na origem, são excluídas.
Para saber mais sobre o processamento CDC com Snapshot, consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline declarativo LakeFlow. Para obter exemplos de uso da função create_auto_cdc_from_snapshot_flow()
, consulte os exemplos de ingestão periódica de Snapshot e de ingestão histórica de Snapshot .
Parâmetros
Parâmetro | Tipo | Descrição |
---|---|---|
|
| Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função |
|
| Obrigatório. O nome de uma tabela ou view para Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão do Snapshot. Veja Implementar o argumento |
|
| Obrigatório. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos do CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
Argumentos para funções |
|
| Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como |
|
| Um subconjunto de colunas de saída a serem rastreadas para histórico na tabela de destino. Use
Argumentos para funções |
Implementar o argumento source
A função create_auto_cdc_from_snapshot_flow()
inclui o argumento source
. Para processar o Snapshot histórico, espera-se que o argumento source
seja uma função lambda Python que retorna dois valores para a função create_auto_cdc_from_snapshot_flow()
: um DataFrame Python contendo os dados do Snapshot a serem processados e uma versão do Snapshot.
A seguir está a assinatura da função lambda:
lambda Any => Optional[(DataFrame, Any)]
- O argumento para a função lambda é a versão mais recentemente processada do Snapshot.
- O valor de retorno da função lambda é
None
ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame contendo o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.
Um exemplo que implementa e chama a função lambda:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
O tempo de execução do pipeline declarativo LakeFlow executa os seguintes passos sempre que o pipeline que contém a função create_auto_cdc_from_snapshot_flow()
é acionado:
- execute a função
next_snapshot_and_version
para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot. - Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
- Detecta as alterações no novo Snapshot e as aplica incrementalmente à tabela de destino.
- Retorna ao passo #1 para carregar o próximo Snapshot e sua versão.