Pular para o conteúdo principal

aplicar_alterações

A função apply_changes() usa a funcionalidade DLT captura de dados de alterações (CDC) (CDC) para processar dados de origem de um feed de dados de alterações (CDF).

important

O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes(), você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados dos campos sequence_by.

Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface DLT Python.

Sintaxe

Python
import dlt

dlt.apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)

Para o processamento de apply_changes, o comportamento de default para os eventos INSERT e UPDATE é a inserção de eventos CDC da origem: atualize todas as linhas da tabela de destino que correspondam aos key(s) especificados ou insira uma nova linha quando não houver um registro correspondente na tabela de destino. O tratamento de eventos DELETE pode ser especificado com o parâmetro apply_as_deletes.

Para saber mais sobre o processamento do CDC com um feed de alterações, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com DLT. Para obter um exemplo de uso da função apply_changes(), consulte Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Parâmetros

Parâmetro

Tipo

Descrição

target

str

Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table () para criar a tabela de destino antes de executar a função apply_changes().

source

str

Obrigatório. A fonte de dados que contém os registros CDC.

keys

list

Obrigatório. A coluna ou combinação de colunas que identifica de forma exclusiva uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar uma das seguintes opções:

  • Uma lista de strings: ["userId", "orderId"]
  • Uma lista de funções Spark SQL col(): [col("userId"), col("orderId"]. Os argumentos das funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

sequence_by

str, col() ou struct()

Obrigatório. Os nomes das colunas que especificam a ordem lógica dos eventos CDC nos dados de origem. A DLT usa esse sequenciamento para lidar com eventos de mudança que chegam fora de ordem. A coluna especificada deve ser um tipo de dados classificável. Você pode especificar uma das seguintes opções:

  • Uma string: "sequenceNum"
  • Uma função Spark SQL col(): col("sequenceNum"). Os argumentos das funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
  • Um struct() combinando várias colunas para desempatar: struct("timestamp_col", "id_col"), ele será ordenado primeiro pelo primeiro campo de estrutura, depois pelo segundo campo se houver empate e assim por diante.

ignore_null_updates

bool

Permitir a ingestão de atualizações que contenham um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com um null mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos pelos valores null.

O padrão é False.

apply_as_deletes

str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:

  • Uma string: "Operation = 'DELETE'"
  • Uma função Spark SQL expr(): expr("Operation = 'DELETE'")

Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma lápide na tabela subjacente Delta e é criado um view no metastore que filtra essas lápides. O intervalo de retenção pode ser configurado com a propriedade de tabela pipelines.cdc.tombstoneGCThresholdInSeconds.

apply_as_truncates

str ou expr()

Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Você pode especificar:

  • Uma string: "Operation = 'TRUNCATE'"
  • Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Como essa cláusula aciona um truncamento completo da tabela de destino, ela deve ser usada somente para casos de uso específicos que exijam essa funcionalidade. O parâmetro apply_as_truncates é compatível apenas com o SCD tipo 1. O SCD tipo 2 não oferece suporte a operações de truncamento.

column_list ou except_column_list

list

Um subconjunto de colunas a incluir na tabela de destino. Use column_list para especificar a lista completa de colunas a incluir. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de strings ou como funções Spark SQL col():

  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Os argumentos das funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é para incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list for passado para a função.

stored_as_scd_type

str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O default é SCD tipo 1.

track_history_column_list ou track_history_except_column_list

list

Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. O senhor pode declarar qualquer um dos valores como uma lista de strings ou como funções Spark SQL col():

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos das funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é para incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list for passado para a função.