Pular para o conteúdo principal

O que é captura de dados de alterações (CDC) (CDC)?

captura de dados de alterações (CDC) (CDC) é um padrão de integração de dados que captura alterações feitas nos dados em um sistema de origem, como inserções, atualizações e exclusões. Essas alterações, representadas como uma lista, são comumente chamadas de feed do CDC . Você pode processar seus dados muito mais rápido se operar em um feed CDC , em vez de ler todo o dataset de origem. Bancos de dados transacionais como SQL Server, MySQL e Oracle geram feeds CDC. As tabelas Delta geram seu próprio feed CDC, conhecido como feed de dados alterados (CDF).

O diagrama a seguir mostra que, quando uma linha em uma tabela de origem que contém dados de funcionários é atualizada, ela gera um novo conjunto de linhas em um feed do CDC que contém apenas as alterações. Cada linha do feed CDC normalmente contém metadados adicionais, incluindo operações como UPDATE e uma coluna que pode ser usada para ordenar deterministicamente cada linha no feed CDC para que você possa lidar com atualizações fora de ordem. Por exemplo, a coluna sequenceNum no diagrama a seguir determina a ordem das linhas no feed do CDC:

visão geral da captura de dados de alterações (CDC).

Processando um feed de dados de alterações: manter apenas os dados mais recentes vs. manter versões históricas dos dados

O processamento de um feed de dados alterado é conhecido como dimensões que mudam lentamente (SCD) (SCD) . Ao processar um feed do CDC, você tem uma escolha a fazer:

  • Você mantém apenas os dados mais recentes (ou seja, substitui os dados existentes)? Isso é conhecido como SCD Tipo 1 .
  • Ou você mantém um histórico de alterações nos dados? Isso é conhecido como SCD Tipo 2 .

O processamento SCD Tipo 1 envolve a substituição de dados antigos por novos sempre que ocorre uma alteração. Isso significa que nenhum histórico das alterações é mantido. Somente a versão mais recente dos dados está disponível. É uma abordagem direta e geralmente usada quando o histórico de alterações não é importante, como corrigir erros ou atualizar campos não críticos, como endereços de email de clientes.

captura de dados de alterações (CDC) Visão geral SCD Tipo 1.

O processamento SCD Tipo 2 mantém um registro histórico de alterações de dados criando registros adicionais para capturar diferentes versões dos dados ao longo do tempo. Cada versão dos dados é marcada com carimbo de data/hora ou tags com metadados que permitem aos usuários rastrear quando uma alteração ocorreu. Isso é útil quando é importante rastrear a evolução dos dados, como acompanhar mudanças de endereço do cliente ao longo do tempo para fins de análise.

captura de dados de alterações (CDC) Visão geral SCD Tipo 2.

Exemplos de processamento SCD Tipo 1 e Tipo 2 com pipeline declarativo LakeFlow

Os exemplos nesta seção mostram como usar o SCD Tipo 1 e Tipo 2.

o passo 1: Preparar dados de amostra

Neste exemplo, você gerará um feed de amostra do CDC. Primeiro, crie um Notebook e cole o seguinte código nele. Atualize as variáveis no início do bloco de código para um catálogo e esquema onde você tenha permissão para criar tabelas e visualizações.

Este código cria uma nova tabela Delta que contém vários registros de alteração. O esquema é o seguinte:

  • id - Inteiro, identificador único deste funcionário
  • name - strings, nome do funcionário
  • role - strings, função do funcionário
  • country - strings, código do país, onde o funcionário trabalha
  • operation - Alterar tipo (por exemplo, INSERT, UPDATE ou DELETE)
  • sequenceNum - Inteiro, identifica a ordem lógica dos eventos CDC nos dados de origem. O pipeline declarativo LakeFlow usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.
Python
# update these to the catalog and schema where you have permissions
# to create tables and views.

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"

def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")

write_employees_cdf_to_delta()

Você pode visualizar este uso de dados com o seguinte comando SQL :

SQL
SELECT *
FROM mycatalog.myschema.employees_cdf

o passo 2: Use SCD Tipo 1 para manter apenas os dados mais recentes

Recomendamos usar a API AUTO CDC em um pipeline declarativo LakeFlow para processar um feed de dados de alteração em uma tabela SCD Tipo 1.

  1. Crie um novo Notebook.
  2. Cole o seguinte código nele.
  3. Crie e conecte-se a um pipeline.

A função employees_cdf lê a tabela que acabamos de criar acima como uma transmissão porque a API create_auto_cdc_flow , que você usará para processamento de captura de dados de alterações (CDC), espera uma transmissão de alterações como entrada. Você o envolve com um decorador @dp.temporary_view porque não quer materializar essa transmissão em uma tabela.

Em seguida, você usa dp.create_target_table para criar uma tabela de transmissão que contém o resultado do processamento desse feed de dados de alteração.

Por fim, você usa dp.create_auto_cdc_flow para processar o feed de dados de alteração. Vamos dar uma olhada em cada argumento:

  • target - A tabela de transmissão de destino, que você definiu anteriormente.
  • source - A view sobre a transmissão de registros de alterações, que você definiu anteriormente.
  • keys - Identifica linhas exclusivas no feed de alterações. Como você está usando id como um identificador exclusivo, forneça apenas id como a única coluna de identificação.
  • sequence_by - O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. Você precisa desse sequenciamento para lidar com eventos de mudança que chegam fora de ordem. Forneça sequenceNum como coluna de sequenciamento.
  • apply_as_deletes - Como os dados de exemplo contêm operações de exclusão, você usa apply_as_deletes para indicar quando um evento CDC deve ser tratado como um DELETE em vez de um upsert.
  • except_column_list - Contém uma lista de colunas que você não deseja incluir na tabela de destino. Neste exemplo, você usará este argumento para excluir sequenceNum e operation.
  • stored_as_scd_type - Indica o tipo de SCD que você deseja usar.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"

@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")

dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")

dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

execute este pipeline clicando em começar .

Em seguida, execute a seguinte consulta no editor SQL para verificar se os registros de alteração foram processados corretamente:

SQL
SELECT *
FROM mycatalog.myschema.employees_current
nota

A atualização fora de ordem para o funcionário Chris foi descartada corretamente, pois sua função ainda está definida como Proprietário em vez de Gerente.

exemplo de captura de dados de alterações (CDC) SCD Tipo 1.

o passo 3: Use SCD Tipo 2 para manter o histórico dos dados

Neste exemplo, você cria uma segunda tabela de destino, chamada employees_historical, que contém um histórico completo de alterações nos registros de funcionários.

Adicione este código ao seu pipeline. A única diferença aqui é que stored_as_scd_type é definido como 2 em vez de 1.

Python
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")

dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)

execute este pipeline clicando em começar .

Em seguida, execute a seguinte consulta no editor SQL para verificar se os registros de alteração foram processados corretamente:

SQL
SELECT *
FROM mycatalog.myschema.employees_historical

Você verá todas as alterações nos funcionários, incluindo aqueles que foram excluídos, como Pat.

exemplo de captura de dados de alterações (CDC) SCD Tipo 2.

o passo 4: Limpar recurso

Quando terminar, limpe o recurso seguindo estes passos:

  1. Excluir o pipeline:
nota

Quando você exclui o pipeline, ele exclui automaticamente as tabelas employees e employees_historical .

  1. Clique em Trabalhos e pipeline e encontre o nome do pipeline a ser excluído.

  2. Clique no Ícone de estouro. na mesma linha do nome do pipeline e clique em Excluir .

  3. Exclua o Bloco de Notas.

  4. Exclua a tabela que contém o feed de dados de alteração:

    1. Clique em Novo > Consulta .
    2. Cole e execute o seguinte código SQL , ajustando o catálogo e o esquema conforme apropriado:
SQL
DROP TABLE mycatalog.myschema.employees_cdf

Desvantagens de usar MERGE INTO e foreachBatch para captura de dados de alterações (CDC)

O Databricks fornece um comando SQL MERGE INTO que você pode usar com a API foreachBatch para inserir linhas em uma tabela Delta. Esta seção explora como essa técnica pode ser usada para casos de uso simples, mas esse método se torna cada vez mais complexo e frágil quando aplicado a cenários do mundo real.

Neste exemplo, você usará o mesmo feed de dados de alteração de amostra usado nos exemplos anteriores.

Implementação ingênua com MERGE INTO e foreachBatch

Crie um Notebook e copie o seguinte código nele. Altere as variáveis catalog, schema e employees_table conforme apropriado. As variáveis catalog e schema devem ser definidas para locais no Unity Catalog onde você pode criar tabelas.

Quando você executa o Notebook, ele faz o seguinte:

  • Cria a tabela de destino em create_table. Ao contrário de create_auto_cdc_flow, que manipula esse passo automaticamente, você precisa especificar o esquema.
  • Lê o feed de dados alterados como uma transmissão. Cada microlote é processado usando o método upsertToDelta , que executa o comando MERGE INTO .
Python
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"

def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")

create_table()

cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")

cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()

Para ver os resultados, execute a seguinte consulta SQL :

SQL
SELECT *
FROM mycatalog.myschema.employees_merge

Infelizmente, os resultados estão incorretos, conforme mostrado a seguir:

exemplo de captura de dados de alterações (CDC) MERGE INTO .

Várias atualizações para a mesma key no mesmo microbatch

O primeiro problema é que o código não lida com múltiplas atualizações para a mesma key no mesmo microlote. Por exemplo, você usa INSERT para inserir o funcionário Chris e então atualiza sua função de Proprietário para Gerente. Isso deveria resultar em uma linha, mas em vez disso há duas linhas.

Qual alteração vence quando há várias atualizações na mesma key em um microlote?

captura de dados de alterações (CDC) múltiplas atualizações para a mesma key no mesmo exemplo de microlote.

A lógica se torna mais complexa. O exemplo de código a seguir recupera a linha mais recente por sequenceNum e mescla somente esses dados na tabela de destino da seguinte maneira:

  • Agrupa pela key primária , id.
  • Pega todas as colunas da linha que tem o máximo sequenceNum nos lotes para essa key.
  • Explode a fileira de volta.

Atualize o método upsertToDelta conforme mostrado a seguir e execute o código:

Python
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")

spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Ao consultar a tabela de destino, você vê que o funcionário chamado Chris tem a função correta, mas ainda há outros problemas a serem resolvidos porque você ainda tem registros excluídos aparecendo na tabela de destino.

captura de dados de alterações (CDC) múltiplas atualizações para a mesma key no mesmo resultado de exemplo de microlote.

Atualizações fora de ordem em microlotes

Esta seção explora o problema de atualizações fora de ordem em microlotes. O diagrama a seguir ilustra o problema: e se a linha de Chris tiver uma operação UPDATE no primeiro microbatch seguida por um INSERT em um microbatch subsequente? O código não lida com isso corretamente.

Qual alteração vence quando há atualizações fora de ordem na mesma key em vários microlotes?

exemplo de captura de dados de alterações (CDC) atualizações fora de ordem em microlotes.

Para corrigir isso, expanda o código para armazenar uma versão em cada linha da seguinte maneira:

  • Armazene o sequenceNum quando uma linha foi atualizada pela última vez.
  • Para cada nova linha, verifique se o registro de data e hora é maior que o armazenado e então aplique a seguinte lógica:
    • Se for maior, use os novos dados do destino.
    • Caso contrário, mantenha os dados na fonte.

Primeiro, atualize o método createTable para armazenar o sequenceNum , já que você o usará para versionar cada linha:

Python
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")

Em seguida, atualize upsertToDelta para manipular versões de linha. A cláusula UPDATE SET de MERGE INTO precisa manipular cada coluna separadamente.

Python
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")

spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")

Manipulando exclusões

Infelizmente, o código ainda tem um problema. Ele não manipula operações DELETE , como evidenciado pelo fato de que o funcionário Pat ainda está na tabela de destino.

Vamos supor que as exclusões cheguem no mesmo microlote. Para lidar com eles, atualize o método upsertToDelta novamente para excluir a linha quando o registro de dados de alteração indicar exclusão, conforme mostrado a seguir:

Python
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")

spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")

Lidando com atualizações que chegam fora de ordem após exclusões

Infelizmente, o código acima ainda não está totalmente correto porque não lida com casos em que um DELETE é seguido por um UPDATE fora de ordem em microlotes.

exemplo de captura de dados de alterações (CDC) que trata atualizações que chegam fora de ordem após exclusões.

O algoritmo para lidar com esse caso precisa se lembrar de exclusões para que possa lidar com atualizações fora de ordem subsequentes. Para fazer isso:

  • Em vez de excluir linhas imediatamente, exclua-as suavemente com um registro de data e hora ou sequenceNum. Linhas excluídas temporariamente são marcadas como tombstone .
  • Redirecione todos os seus usuários para uma view que filtra as lápides.
  • Crie uma tarefa de limpeza que remova as lápides ao longo do tempo.

Use o seguinte código:

Python
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")

spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")

Seus usuários não podem usar a tabela de destino diretamente, então crie uma view que eles possam consultar:

CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL

Por fim, crie um trabalho de limpeza que remova periodicamente as linhas marcadas para exclusão:

DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY