Pular para o conteúdo principal

criar_fluxo_automático_cdc

A função create_auto_cdc_flow() cria um fluxo que usa a funcionalidade de captura de dados de alterações (CDC) do pipeline declarativo LakeFlow Spark para processar dados de origem de um feed de dados alterados (CDF).

nota

Esta função substitui a função anterior apply_changes(). As duas funções têm a mesma assinatura. A Databricks recomenda atualizar para usar o novo nome.

importante

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino create_auto_cdc_flow() , você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados dos campos sequence_by .

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do pipeline.

Sintaxe

Python
from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)

Para o processamento create_auto_cdc_flow , o comportamento default para eventos INSERT e UPDATE é inserir eventos CDC da origem: atualizar todas as linhas na tabela de destino que correspondem à(s) key(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. O tratamento de eventos DELETE pode ser especificado com o parâmetro apply_as_deletes .

Para saber mais sobre o processamento CDC com um feed de alterações, consulte As APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline. Para um exemplo de utilização da função create_auto_cdc_flow() , consulte os exemplos AUTO CDC.

Parâmetros

Parâmetro

Tipo

Descrição

target

str

Obrigatório. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função create_auto_cdc_flow() .

source

str

Obrigatório. A fonte de dados contendo registros CDC .

keys

list

Obrigatório. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos do CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:

  • Uma lista de strings: ["userId", "orderId"]
  • Uma lista de funções Spark SQL col() : [col("userId"), col("orderId")]. Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

sequence_by

str, col() ou struct()

Obrigatório. Os nomes das colunas especificam a ordem lógica dos eventos do CDC nos dados de origem. O pipeline declarativo LakeFlow Spark usa essa sequência para lidar com eventos de alteração que chegam fora de ordem. A coluna especificada deve ser de um tipo de dados classificável. Você pode especificar uma das seguintes opções:

  • Uma string: "sequenceNum"
  • Uma função Spark SQL col() : col("sequenceNum"). Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
  • Um struct() combinando várias colunas para desempatar: struct("timestamp_col", "id_col"), ele ordenará primeiro pelo primeiro campo de estrutura, depois pelo segundo campo se houver empate, e assim por diante.

ignore_null_updates

bool

Controla como os valores null nas atualizações de entrada do CDC são tratados. Quando ignore_null_updates é True, as colunas null em uma atualização de entrada são ignoradas — o valor existente na linha de destino é preservado. Isso também se aplica a colunas aninhadas com valores null . Quando ignore_null_updates é False, as colunas null em uma atualização de entrada sobrescrevem os valores existentes no destino.

Defina como True quando os eventos de origem incluem apenas as colunas que foram alteradas, para que as colunas inalteradas não sejam sobrescritas com null.

O padrão é False.

apply_as_deletes

str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:

  • Uma string: "Operation = 'DELETE'"
  • Uma função Spark SQL expr(): expr("Operation = 'DELETE'")

Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma marca de exclusão na tabela Delta subjacente, e uma view é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção padrão é de dois dias e pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds .

Se você usar o Auto Loader como fonte para seu pipeline CDC, o Auto Loader não garante a ordem de processamento dos arquivos. Para obter detalhes, consulte Lidar com dados fora de ordem. Defina pipelines.cdc.tombstoneGCThresholdInSeconds com um valor que exceda o atraso máximo esperado entre a chegada do evento e a execução do pipeline. Isso garante que os registros de exclusão sejam mantidos por tempo suficiente para lidar corretamente com eventos de exclusão que chegam atrasados ou fora de ordem.

apply_as_truncates

str ou expr()

Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Você pode especificar:

  • Uma string: "Operation = 'TRUNCATE'"
  • Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Como esta cláusula aciona um truncamento completo da tabela de destino, ela deve ser usada somente para casos de uso específicos que exigem essa funcionalidade. O parâmetro apply_as_truncates é suportado apenas para SCD tipo 1. O SCD tipo 2 não suporta operações de truncamento.

column_list ou except_column_list

list

Um subconjunto de colunas a incluir na tabela de destino. Use column_list para especificar a lista completa de colunas a incluir. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de strings ou como funções Spark SQL col():

  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.

stored_as_scd_type

str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O default é SCD tipo 1.

track_history_column_list ou track_history_except_column_list

list

Um subconjunto de colunas de saída a serem rastreadas para histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de strings ou como funções Spark SQL col() :

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O default é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

name

str

O nome do fluxo. Se não for fornecido, o padrão será o mesmo valor de target.

once

bool

Opcionalmente, defina o fluxo como um fluxo único, como um aterro. Usar once=True altera o fluxo de duas maneiras:

  • O valor de retorno. streaming-query. deve ser um lotes DataFrame neste caso, não um transmissão DataFrame.
  • O fluxo é executado uma vez por default. Se o pipeline for atualizado com uma refresh completa, o fluxo ONCE será executado novamente para recriar os dados.
Nesta página