Pular para o conteúdo principal

criar_fluxo_automático_cdc

A função create_auto_cdc_flow() cria um fluxo que usa a funcionalidade de captura de dados de alterações (CDC) do pipeline declarativo LakeFlow (CDC) para processar dados de origem de um feed de dados alterados (CDF).

nota

Esta função substitui a função anterior apply_changes(). As duas funções têm a mesma assinatura. A Databricks recomenda atualizar para usar o novo nome.

important

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino create_auto_cdc_flow() , você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados dos campos sequence_by .

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do pipeline declarativo LakeFlow .

Sintaxe

Python
from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = False
)

Para o processamento create_auto_cdc_flow , o comportamento default para eventos INSERT e UPDATE é inserir eventos CDC da origem: atualizar todas as linhas na tabela de destino que correspondem à(s) key(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. O tratamento de eventos DELETE pode ser especificado com o parâmetro apply_as_deletes .

Para saber mais sobre o processamento CDC com um feed de alterações, consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline declarativo LakeFlow. Para obter um exemplo de uso da função create_auto_cdc_flow() , consulte Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Parâmetros

Parâmetro

Tipo

Descrição

target

str

Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função create_auto_cdc_flow() .

source

str

Obrigatório. A fonte de dados contendo registros CDC .

keys

list

Obrigatório. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos do CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:

  • Uma lista de strings: ["userId", "orderId"]
  • Uma lista de funções Spark SQL col() : [col("userId"), col("orderId")]. Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

sequence_by

str, col() ou struct()

Obrigatório. Os nomes das colunas especificam a ordem lógica dos eventos do CDC nos dados de origem. O pipeline declarativo LakeFlow usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem. A coluna especificada deve ser um tipo de dado classificável. Você pode especificar:

  • Uma string: "sequenceNum"
  • Uma função Spark SQL col() : col("sequenceNum"). Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
  • Um struct() combinando várias colunas para desempatar: struct("timestamp_col", "id_col"), ele ordenará primeiro pelo primeiro campo de estrutura, depois pelo segundo campo se houver empate, e assim por diante.

ignore_null_updates

bool

Permitir a ingestão de atualizações contendo um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com null retêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por valores null .

O padrão é False.

apply_as_deletes

str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:

  • Uma string: "Operation = 'DELETE'"
  • Uma função Spark SQL expr(): expr("Operation = 'DELETE'")

Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma marca de exclusão na tabela Delta subjacente, e uma view é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção padrão é de dois dias e pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds .

apply_as_truncates

str ou expr()

Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Você pode especificar:

  • Uma string: "Operation = 'TRUNCATE'"
  • Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Como esta cláusula aciona um truncamento completo da tabela de destino, ela deve ser usada somente para casos de uso específicos que exigem essa funcionalidade. O parâmetro apply_as_truncates é suportado apenas para SCD tipo 1. O SCD tipo 2 não suporta operações de truncamento.

column_list ou except_column_list

list

Um subconjunto de colunas a incluir na tabela de destino. Use column_list para especificar a lista completa de colunas a incluir. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de strings ou como funções Spark SQL col():

  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.

stored_as_scd_type

str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O default é SCD tipo 1.

track_history_column_list ou track_history_except_column_list

list

Um subconjunto de colunas de saída a serem rastreadas para histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de strings ou como funções Spark SQL col() :

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

name

str

O nome do fluxo. Se não for fornecido, o padrão será o mesmo valor de target.

once

bool

Opcionalmente, defina o fluxo como um fluxo único, como um aterro. Usar once=True altera o fluxo de duas maneiras:

  • O valor de retorno. streaming-query. deve ser um lotes DataFrame neste caso, não um transmissão DataFrame.
  • O fluxo é executado uma vez por default. Se o pipeline for atualizado com uma refresh completa, o fluxo ONCE será executado novamente para recriar os dados.