Pular para o conteúdo principal

O AUTO CDC APIs: Simplifique a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow

LakeFlow O pipeline declarativo simplifica a captura de dados de alterações (CDC) (CDC) com os sites AUTO CDC e AUTO CDC FROM SNAPSHOT APIs.

nota

As APIs AUTO CDC eram chamadas anteriormente de APPLY CHANGES e tinham a mesma sintaxe.

A interface que você usa depende da fonte dos dados de alteração:

  • Use AUTO CDC para processar alterações de um feed de dados de alterações (CDF).
  • Use o site AUTO CDC FROM SNAPSHOT (Public Preview e disponível apenas para Python) para processar alterações no Snapshot do banco de dados.

Anteriormente, a instrução MERGE INTO era comumente usada para processar registros CDC em Databricks. No entanto, MERGE INTO pode produzir resultados incorretos devido a registros fora de sequência ou requer uma lógica complexa para reordenar os registros.

O AUTO CDC API é compatível com as interfaces LakeFlow Declarative pipeline SQL e Python. O AUTO CDC FROM SNAPSHOT API é compatível com a interface LakeFlow Declarative pipeline Python.

Tanto AUTO CDC quanto AUTO CDC FROM SNAPSHOT suportam a atualização de tabelas usando SCD tipo 1 e tipo 2:

  • Use o SCD tipo 1 para atualizar os registros diretamente. O histórico não é mantido para registros atualizados.
  • Use o SCD tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações de um conjunto específico de colunas.

Para conhecer a sintaxe e outras referências, consulte AUTO CDC para LakeFlow Declarative pipeline SQL, AUTO CDC para LakeFlow Declarative pipeline Python e AUTO CDC FROM Snapshot para LakeFlow Declarative pipeline Python.

nota

Este artigo descreve como atualizar tabelas em seu pipeline declarativo LakeFlow com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alteração em nível de linha para tabelas Delta, consulte Use Delta Lake change data feed on Databricks.

Requisitos

Para usar o CDC APIs, seu pipeline deve ser configurado para usar o pipeline Declarativo serverlessLakeflow ou LakeFlow Pro as Advanced edições do pipeline Declarativo ou.

Como o CDC é implementado com a API AUTO CDC?

Ao tratar automaticamente os registros fora de sequência, o pipeline AUTO CDC API in LakeFlow Declarative garante o processamento correto dos registros CDC e elimina a necessidade de desenvolver uma lógica complexa para tratar os registros fora de sequência. O usuário deve especificar uma coluna nos dados de origem para sequenciar os registros, que o pipeline declarativo LakeFlow interpreta como uma representação monotonicamente crescente da ordenação adequada dos dados de origem. LakeFlow O pipeline declarativo trata automaticamente os dados que chegam fora de ordem. Para alterações SCD tipo 2, o pipeline LakeFlow Declarative propaga os valores de sequenciamento apropriados para as colunas __START_AT e __END_AT da tabela de destino. Deve haver uma atualização distinta por key em cada valor de sequenciamento, e não há suporte para valores de sequenciamento NULL.

Para executar o processamento em CDC com AUTO CDC, o senhor primeiro cria uma tabela de transmissão e, em seguida, usa a instrução AUTO CDC ... INTO em SQL ou a função create_auto_cdc_flow() em Python para especificar a fonte, a chave e o sequenciamento do feed de modificação. Para criar a tabela de transmissão de destino, use a instrução CREATE OR REFRESH STREAMING TABLE em SQL ou a função create_streaming_table() em Python. Veja os exemplos de processamento de SCD tipo 1 e tipo 2.

Para obter detalhes sobre a sintaxe, consulte LakeFlow Declarative pipeline SQL reference ou Python reference.

Como o CDC é implementado com a AUTO CDC FROM SNAPSHOT API? API?

info

Visualização

A API AUTO CDC FROM SNAPSHOT está em pré-visualização pública.

AUTO CDC FROM SNAPSHOT é um API declarativo que determina com eficiência as alterações nos dados de origem comparando uma série de Snapshot em ordem e, em seguida, executa o processamento necessário para CDC o processamento dos registros no Snapshot. AUTO CDC FROM SNAPSHOT é compatível apenas com a interface LakeFlow Declarative pipeline Python.

AUTO CDC FROM SNAPSHOT suporta a ingestão de Snapshot de vários tipos de fontes:

  • Use a ingestão periódica de Snapshot para ingerir Snapshot de uma tabela existente ou view. AUTO CDC FROM SNAPSHOT tem uma interface simples e otimizada para suportar a ingestão periódica de Snapshot a partir de um objeto de banco de dados existente. Um novo Snapshot é ingerido a cada atualização do site pipeline, e o tempo de ingestão é usado como a versão do Snapshot. Quando um pipeline é executado no modo contínuo, vários instantâneos são ingeridos a cada atualização do pipeline em um período determinado pela configuração do intervalo de acionamento do fluxo que contém o processamento do AUTO CDC FROM SNAPSHOT.
  • Use a ingestão de Snapshot histórico para processar arquivos que contenham Snapshot de banco de dados, como Snapshot gerado a partir de um banco de dados Oracle ou MySQL ou um data warehouse.

Para executar o processamento de CDC a partir de qualquer tipo de origem com AUTO CDC FROM SNAPSHOT, o senhor primeiro cria uma tabela de transmissão e, em seguida, usa a função create_auto_cdc_from_snapshot_flow() em Python para especificar o Snapshot, a chave e outros argumentos necessários para implementar o processamento. Veja os exemplos de ingestão periódica de Snapshot e ingestão histórica de Snapshot.

O Snapshot passado para API deve estar em ordem crescente por versão. Se o pipeline LakeFlow Declarative detectar um Snapshot fora de ordem, será gerado um erro.

Para obter detalhes sobre a sintaxe, consulte a referência LakeFlow Declarative pipeline.Python

Use várias colunas para sequenciamento

Você pode sequenciar por várias colunas (por exemplo, um carimbo de data/hora e um ID para desempatar), usar um STRUCT para combiná-las: ele ordena primeiro pelo primeiro campo do STRUCT e, no caso de empate, considera o segundo campo e assim por diante.

Exemplo em SQL:

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

Exemplo em Python:

Python
sequence_by = struct("timestamp_col", "id_col")

Limitações

A coluna usada para sequenciamento deve ser um tipo de dados classificável.

Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF

As seções a seguir fornecem exemplos de LakeFlow Declarative pipeline SCD consultas do tipo 1 e do tipo 2 que atualizam tabelas de destino com base em eventos de origem de um feed de dados de alteração que:

  1. Cria novos registros de usuário.
  2. Exclui um registro de usuário.
  3. Atualiza os registros do usuário. No exemplo do SCD tipo 1, as últimas UPDATE operações chegam atrasadas e são descartadas da tabela de destino, demonstrando o tratamento de eventos fora de ordem.

Os exemplos a seguir pressupõem familiaridade com a configuração e a atualização do pipeline LakeFlow Declarative. Veja o tutorial: Crie um ETL pipeline usando a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow.

Para executar esses exemplos, o senhor deve começar criando um exemplo em dataset. Consulte Gerar dados de teste.

A seguir encontram-se registros de entrada para esses exemplos:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

Se você cancelar o comentário da linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

nota

Todos os exemplos a seguir incluem opções para especificar operações DELETE e TRUNCATE, mas cada uma delas é opcional.

Processar atualizações do SCD tipo 1

O exemplo a seguir demonstra o processamento de atualizações SCD tipo 1:

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

Após executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

Depois de executar o exemplo do SCD tipo 1 com o registro TRUNCATE adicional, os registros 124 e 126 são truncados devido à operação TRUNCATE em sequenceNum=3, e a tabela de destino contém o seguinte registro:

userId

name

city

125

Mercedes

Guadalajara

Processar atualizações do SCD tipo 2

O exemplo a seguir demonstra o processamento de atualizações SCD tipo 2:

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

Após executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

Uma consulta SCD tipo 2 também pode especificar um subconjunto de colunas de saída a ser rastreado para histórico na tabela de destino. As alterações em outras colunas são atualizadas no local, em vez de gerar novos registros de histórico. O exemplo a seguir demonstra a exclusão da coluna city do acompanhamento:

O exemplo a seguir demonstra o uso do histórico de rastreamento com o SCD tipo 2:

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

Depois de executar esse exemplo sem o registro TRUNCATE adicional, a tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

Gere dados de teste

O código abaixo é fornecido para gerar um exemplo dataset para uso nas consultas de exemplo presentes neste tutorial. Supondo que o senhor tenha as credenciais adequadas para criar um novo esquema e uma nova tabela, é possível executar essas instruções com um Notebook ou com o site Databricks SQL. O código a seguir não se destina a ser executado como parte do pipeline declarativo do LakeFlow:

SQL
CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

Exemplo: Processamento periódico de instantâneos

O exemplo a seguir demonstra o processamento SCD tipo 2 que ingere o Snapshot de uma tabela armazenada em mycatalog.myschema.mytable. Os resultados do processamento são gravados em uma tabela chamada target.

mycatalog.myschema.mytable registros no timestamp 2024-01-01 00:00:00

Chave

Valor

1

a1

2

a2

mycatalog.myschema.mytable registros no timestamp 2024-01-01 12:00:00

Chave

Valor

2

b2

3

a3

Python
import dlt

@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)

Após o processamento do Snapshot, a tabela de destino contém os seguintes registros:

Chave

Valor

__START_AT

__END_AT

1

a1

2024-01-01 00:00:00

2024-01-01 12:00:00

2

a2

2024-01-01 00:00:00

2024-01-01 12:00:00

2

b2

2024-01-01 12:00:00

null

3

a3

2024-01-01 12:00:00

null

Exemplo: Processamento de instantâneos históricos

O exemplo a seguir demonstra o processamento SCD tipo 2 que atualiza uma tabela de destino com base em eventos de origem de dois Snapshots armazenados em um sistema de armazenamento em nuvem:

Snapshot em timestamp, armazenado em /<PATH>/filename1.csv

Chave

Coluna de rastreamento

Coluna sem rastreamento

1

a1

b1

2

a2

b2

4

a4

b4

Snapshot em timestamp + 5, armazenado em /<PATH>/filename2.csv

Chave

Coluna de rastreamento

Coluna sem rastreamento

2

a2_new

b2

3

a3

b3

4

a4

b4_novo

O exemplo de código a seguir demonstra o processamento de atualizações do tipo 2 do SCD com esse Snapshot:

Python
import dlt

def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None

dlt.create_streaming_live_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)

Após o processamento do Snapshot, a tabela de destino contém os seguintes registros:

Chave

Coluna de rastreamento

Coluna sem rastreamento

__START_AT

__END_AT

1

a1

b1

1

2

2

a2

b2

1

2

2

a2_new

b2

2

null

3

a3

b3

2

null

4

a4

b4_novo

1

null

Adicionar, alterar ou excluir dados em uma tabela de transmissão de destino

Se o seu pipeline publicar tabelas no Unity Catalog, o senhor poderá usar instruções da linguagem de manipulação de dados (DML), incluindo instruções de inserção, atualização, exclusão e merge, para modificar as tabelas de transmissão de destino criadas pelas instruções AUTO CDC ... INTO.

nota
  • As declarações DML que modificam o esquema de tabela de uma tabela de streaming não são suportadas. Certifique-se de que suas instruções DML não tentem evoluir o esquema da tabela.
  • As instruções DML que atualizam uma tabela de transmissão só podem ser executadas em um clustering Unity Catalog compartilhado ou em um SQL warehouse usando Databricks Runtime 13.3 LTS e acima.
  • Como a transmissão exige fontes de dados somente anexadas, se o seu processamento exigir a transmissão de uma tabela de transmissão de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de transmissão de origem. Quando skipChangeCommits é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Se o processamento não exigir uma tabela de transmissão, o senhor poderá usar uma tabela materializada view (que não tem a restrição de append-only) como tabela de destino.

Como o pipeline LakeFlow Declarative usa uma coluna SEQUENCE BY especificada e propaga os valores de sequenciamento apropriados para as colunas __START_AT e __END_AT da tabela de destino (para SCD tipo 2), é preciso garantir que as instruções DML usem valores válidos para essas colunas a fim de manter a ordem correta dos registros. Consulte Como o CDC é implementado com a API AUTO CDC?

Para obter mais informações sobre o uso de instruções DML com tabelas de transmissão, consulte Adicionar, alterar ou excluir dados em uma tabela de transmissão.

O exemplo seguinte insere um registro ativo com uma sequência inicial de 5:

SQL
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Ler um feed de dados de alteração de uma tabela de destino do AUTO CDC

Em Databricks Runtime 15.2 e acima, o senhor pode ler um feed de dados de alteração de uma tabela de transmissão que seja o alvo de consultas AUTO CDC ou AUTO CDC FROM SNAPSHOT da mesma forma que lê um feed de dados de alteração de outras tabelas Delta. Os seguintes itens são necessários para ler o feed de dados de alteração de uma tabela de transmissão de destino:

  • A tabela de transmissão de destino deve ser publicada em Unity Catalog. Consulte Usar Unity Catalog com o pipeline declarativo LakeFlow.
  • Para ler o feed de dados de alteração da tabela de transmissão de destino, o senhor deve usar o site Databricks Runtime 15.2 ou o acima. Para ler o feed de dados de alteração em um pipeline diferente, o pipeline deve ser configurado para usar o Databricks Runtime 15.2 ou o acima.

O senhor lê o feed de dados de alteração de uma tabela de transmissão de destino que foi criada no pipeline declarativo LakeFlow da mesma forma que lê um feed de dados de alteração de outras tabelas Delta. Para saber mais sobre como usar a funcionalidade de feed de dados de alterações do Delta, incluindo exemplos em Python e SQL, consulte Usar o feed de dados de alterações do Delta Lake na Databricks.

nota

O registro do feed de dados de alteração inclui metadados que identificam o tipo de evento de alteração. Quando um registro é atualizado em uma tabela, os metadados dos registros de alteração associados normalmente incluem valores _change_type definidos como eventos update_preimage e update_postimage.

No entanto, os valores _change_type serão diferentes se forem feitas atualizações na tabela de transmissão de destino que incluam a alteração dos valores primários key. Quando as alterações incluem atualizações na chave primária, os campos de metadados _change_type são definidos como eventos insert e delete. As alterações na chave primária podem ocorrer quando são feitas atualizações manuais em um dos campos key com uma instrução UPDATE ou MERGE ou, no caso de tabelas SCD tipo 2, quando o campo __start_at é alterado para refletir um valor de sequência anterior.

A consulta AUTO CDC determina os valores primários de key, que diferem para o processamento de SCD tipo 1 e SCD tipo 2:

  • Para o processamento SCD tipo 1 e a interface LakeFlow Declarative pipeline Python, o principal key é o valor do parâmetro keys na função create_auto_cdc_flow(). Para a interface LakeFlow Declarative pipeline SQL, a principal key são as colunas definidas pela cláusula KEYS na declaração AUTO CDC ... INTO.
  • Para SCD tipo 2, o key primário é o parâmetro keys ou a cláusula KEYS mais o valor de retorno das coalesce(__START_AT, __END_AT) operações, em que __START_AT e __END_AT são as colunas correspondentes da tabela de transmissão de destino.

Obtenha dados sobre registros processados por um pipeline declarativo LakeFlow CDC query

nota

As métricas a seguir são capturadas apenas por consultas AUTO CDC e não por consultas AUTO CDC FROM SNAPSHOT.

As seguintes métricas são capturadas por consultas AUTO CDC:

  • num_upserted_rows : O número de linhas de saída inseridas no site dataset durante uma atualização.
  • num_deleted_rows : O número de linhas de saída existentes excluídas do site dataset durante uma atualização.

A métrica num_output_rows, saída para fluxos não-CDC, não é capturada para consultas AUTO CDC.

Quais objetos de dados são usados para o processamento do LakeFlow Declarative pipeline CDC?

nota
  • Essas estruturas de dados se aplicam somente ao processamento AUTO CDC, não ao processamento AUTO CDC FROM SNAPSHOT.
  • Essas estruturas de dados se aplicam somente quando a tabela de destino é publicada no site Hive metastore. Se um pipeline for publicado no Unity Catalog, as tabelas de apoio internas ficarão inacessíveis aos usuários.

Quando você declara a tabela de destino no Hive metastore, são criadas duas estruturas de dados:

  • Uma visualização usando o nome atribuído à tabela de destino.
  • Uma tabela de apoio interna usada pelo pipeline declarativo LakeFlow para gerenciar o processamento CDC. Essa tabela é nomeada adicionando __apply_changes_storage_ ao nome da tabela de destino.

Por exemplo, se o senhor declarar uma tabela de destino chamada dlt_cdc_target, verá uma view chamada dlt_cdc_target e uma tabela chamada __apply_changes_storage_dlt_cdc_target no metastore. A criação de um view permite que o pipeline LakeFlow Declarative filtre as informações extras (por exemplo, lápides e versões) necessárias para lidar com dados fora de ordem. Para view os dados processados, consulte o destino view. Como o esquema da tabela __apply_changes_storage_ pode ser alterado para dar suporte a recursos ou aprimoramentos futuros, o senhor não deve consultar a tabela para uso em produção. Se você adicionar dados manualmente à tabela, presume-se que os registros venham antes de outras alterações porque as colunas da versão estão ausentes.

Recurso adicional