Pular para o conteúdo principal

apply_changes_from_snapshot

info

Visualização

Essa funcionalidade está na Pré-visualização Pública.

A função apply_changes_from_snapshot usa a funcionalidade DLT captura de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot. Consulte Como o CDC é implementado com a API APPLY CHANGES FROM SNAPSHOT?

important

O senhor deve ter uma tabela de transmissão de destino para essas operações. Opcionalmente, você pode especificar as colunas e seus tipos para sua tabela de destino. Ao especificar as colunas e seus tipos para a tabela de destino apply_changes_from_snapshot(), você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados do campo sequence_by.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table ().

Sintaxe

Python
import dlt

dlt.apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
nota

Para o processamento de APPLY CHANGES FROM SNAPSHOT, o comportamento de default é inserir uma nova linha quando um registro correspondente com o mesmo key(s) não existir no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento do CDC com o Snapshot, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com DLT. Para obter exemplos de uso da função apply_changes_from_snapshot(), consulte os exemplos de ingestão periódica de Snapshot e ingestão de Snapshot histórico.

Parâmetros

Parâmetro

Tipo

Descrição

target

str

Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table () para criar a tabela de destino antes de executar a função apply_changes().

source

str ou lambda function

Obrigatório. O nome de uma tabela ou view para o Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão do Snapshot. Consulte Implementar o argumento source.

keys

list

Obrigatório. A coluna ou combinação de colunas que identifica de forma exclusiva uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar uma das seguintes opções:

  • Uma lista de strings: ["userId", "orderId"]

  • Uma lista de funções Spark SQL col(): [col("userId"), col("orderId"].

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

stored_as_scd_type

str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O default é SCD tipo 1.

track_history_column_list ou track_history_except_column_list

list

Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. O senhor pode declarar qualquer um dos valores como uma lista de strings ou como funções Spark SQL col():

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos das funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é para incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list for passado para a função.

Implemente o argumento source

A função apply_changes_from_snapshot() inclui o argumento source. Para processar o Snapshot histórico, espera-se que o argumento source seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot(): um Python DataFrame contendo os dados do Snapshot a serem processados e uma versão do Snapshot.

A seguir está a assinatura da função lambda:

Python
lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão do Snapshot processada mais recentemente.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.

Um exemplo que implementa e chama a função lambda:

Python
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None

apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)

O tempo de execução do DLT executa as seguintes etapas sempre que o pipeline que contém a função apply_changes_from_snapshot() é acionado:

  1. executar a função next_snapshot_and_version para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.
  4. Retorna à etapa 1 para carregar o próximo Snapshot e sua versão.