apply_changes_from_snapshot
Visualização
Essa funcionalidade está na Pré-visualização Pública.
A função apply_changes_from_snapshot
usa a funcionalidade DLT captura de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot. Consulte Como o CDC é implementado com a API APPLY CHANGES FROM SNAPSHOT
?
O senhor deve ter uma tabela de transmissão de destino para essas operações. Opcionalmente, você pode especificar as colunas e seus tipos para sua tabela de destino. Ao especificar as colunas e seus tipos para a tabela de destino apply_changes_from_snapshot()
, você também deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados do campo sequence_by
.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table ().
Sintaxe
import dlt
dlt.apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Para o processamento de APPLY CHANGES FROM SNAPSHOT
, o comportamento de default é inserir uma nova linha quando um registro correspondente com o mesmo key(s) não existir no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.
Para saber mais sobre o processamento do CDC com o Snapshot, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com DLT. Para obter exemplos de uso da função apply_changes_from_snapshot()
, consulte os exemplos de ingestão periódica de Snapshot e ingestão de Snapshot histórico.
Parâmetros
Parâmetro | Tipo | Descrição |
---|---|---|
|
| Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table () para criar a tabela de destino antes de executar a função |
|
| Obrigatório. O nome de uma tabela ou view para o Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão do Snapshot. Consulte Implementar o argumento |
|
| Obrigatório. A coluna ou combinação de colunas que identifica de forma exclusiva uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar uma das seguintes opções:
Argumentos para funções |
|
| Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como |
|
| Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use
Os argumentos das funções |
Implemente o argumento source
A função apply_changes_from_snapshot()
inclui o argumento source
. Para processar o Snapshot histórico, espera-se que o argumento source
seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot()
: um Python DataFrame contendo os dados do Snapshot a serem processados e uma versão do Snapshot.
A seguir está a assinatura da função lambda:
lambda Any => Optional[(DataFrame, Any)]
- O argumento para a função lambda é a versão do Snapshot processada mais recentemente.
- O valor de retorno da função lambda é
None
ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.
Um exemplo que implementa e chama a função lambda:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
O tempo de execução do DLT executa as seguintes etapas sempre que o pipeline que contém a função apply_changes_from_snapshot()
é acionado:
- executar a função
next_snapshot_and_version
para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot. - Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
- Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.
- Retorna à etapa 1 para carregar o próximo Snapshot e sua versão.