Pular para o conteúdo principal

criar_auto_cdc_a_partir_do_fluxo_de_instantâneo

info

Visualização

Esta funcionalidade está em Visualização Pública.

A função create_auto_cdc_from_snapshot_flow cria um fluxo que usa a funcionalidade de captura de dados de alterações (CDC) do pipeline declarativo LakeFlow para processar dados de origem do instantâneo do banco de dados. Veja como o CDC é implementado com a API AUTO CDC FROM SNAPSHOT ?.

nota

Esta função substitui a função anterior apply_changes_from_snapshot(). As duas funções têm a mesma assinatura. A Databricks recomenda atualizar para usar o novo nome.

important

Você deve ter uma tabela de transmissão alvo para estas operações.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() .

Sintaxe

Python
from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
nota

Para o processamento AUTO CDC FROM SNAPSHOT , o comportamento default é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) key(s) não existe(m) no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. Linhas com chave presente no destino, mas não mais presente na origem, são excluídas.

Para saber mais sobre o processamento CDC com Snapshot, consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline declarativo LakeFlow. Para obter exemplos de uso da função create_auto_cdc_from_snapshot_flow() , consulte os exemplos de ingestão periódica de Snapshot e de ingestão histórica de Snapshot .

Parâmetros

Parâmetro

Tipo

Descrição

target

str

Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função create_auto_cdc_from_snapshot_flow() .

source

str ou lambda function

Obrigatório. O nome de uma tabela ou view para Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão do Snapshot. Veja Implementar o argumento source.

keys

list

Obrigatório. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos do CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:

  • Uma lista de strings: ["userId", "orderId"]

  • Uma lista de funções Spark SQL col() : [col("userId"), col("orderId"].

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

stored_as_scd_type

str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O default é SCD tipo 1.

track_history_column_list ou track_history_except_column_list

list

Um subconjunto de colunas de saída a serem rastreadas para histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de strings ou como funções Spark SQL col() :

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

Implementar o argumento source

A função create_auto_cdc_from_snapshot_flow() inclui o argumento source . Para processar o Snapshot histórico, espera-se que o argumento source seja uma função lambda Python que retorna dois valores para a função create_auto_cdc_from_snapshot_flow() : um DataFrame Python contendo os dados do Snapshot a serem processados e uma versão do Snapshot.

A seguir está a assinatura da função lambda:

Python
lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão mais recentemente processada do Snapshot.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame contendo o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.

Um exemplo que implementa e chama a função lambda:

Python
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None

create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)

O tempo de execução do pipeline declarativo LakeFlow executa os seguintes passos sempre que o pipeline que contém a função create_auto_cdc_from_snapshot_flow() é acionado:

  1. execute a função next_snapshot_and_version para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Detecta as alterações no novo Snapshot e as aplica incrementalmente à tabela de destino.
  4. Retorna ao passo #1 para carregar o próximo Snapshot e sua versão.