O que é captura de dados de alterações (CDC) (CDC)?
A captura de dados de alterações (CDC) (CDC) é um padrão de integração de dados que captura as alterações feitas nos dados em um sistema de origem, como inserções, atualizações e exclusões. Essas alterações, representadas como uma lista, são comumente chamadas de feed do CDC. O senhor pode processar seus dados muito mais rapidamente se operar em um feed CDC, em vez de ler a fonte inteira dataset. Os bancos de dados transacionais, como SQL Server, MySQL e Oracle, geram feeds CDC. Delta As tabelas geram seu próprio feed CDC, conhecido como change data feed (CDF).
O diagrama a seguir mostra que, quando uma linha em uma tabela de origem que contém dados de funcionários é atualizada, ela gera um novo conjunto de linhas em um feed CDC que contém apenas as alterações. Cada linha do feed CDC normalmente contém metadados adicionais, incluindo as operações como UPDATE
e uma coluna que pode ser usada para ordenar deterministicamente cada linha no feed CDC para que o senhor possa lidar com atualizações fora de ordem. Por exemplo, a coluna sequenceNum
no diagrama a seguir determina a ordem das linhas no feed do CDC:
Processando um feed de dados de alterações: mantenha apenas os dados mais recentes versus mantenha as versões históricas dos dados
O processamento de um feed de dados alterado é conhecido como dimensões que mudam lentamente (SCD) (SCD). Quando o senhor processa um feed do CDC, tem uma opção a fazer:
Você mantém somente os dados mais recentes (ou seja, sobrescreve os dados existentes)? Isso é conhecido como SCD Tipo 1.
Ou o senhor mantém um histórico das alterações nos dados? Isso é conhecido como SCD Tipo 2.
O processamento do SCD Tipo 1 envolve a substituição de dados antigos por novos dados sempre que ocorre uma alteração. Isso significa que nenhum histórico das alterações é mantido. Somente a versão mais recente dos dados está disponível. É uma abordagem simples e geralmente é usada quando o histórico de alterações não é importante, como a correção de erros ou a atualização de campos não críticos, como os endereços email dos clientes.
O processamento SCD Tipo 2 mantém um registro histórico das alterações de dados, criando registros adicionais para capturar diferentes versões dos dados ao longo do tempo. Cada versão dos dados é marcada com data e hora ou marcada com metadados que permitem aos usuários rastrear quando ocorreu uma alteração. Isso é útil quando é importante rastrear a evolução dos dados, como acompanhar as mudanças de endereço do cliente ao longo do tempo para fins de análise.
Exemplos de processamento de SCD Tipo 1 e Tipo 2 com Delta Live Tables
Os exemplos desta seção mostram como usar o SCD Tipo 1 e Tipo 2.
Etapa 1: Preparar dados de amostra
Neste exemplo, o senhor gerará um feed de CDC de amostra. Primeiro, crie um Notebook e cole o seguinte código nele. Atualize as variáveis no início do bloco de código para um catálogo e um esquema em que o senhor tenha permissão para criar tabelas e visualizações.
Esse código cria uma nova tabela Delta que contém vários registros de alteração. O esquema é o seguinte:
id
- Número inteiro, identificador exclusivo desse funcionárioname
- strings, nome do funcionárioage
- Número inteiro, idade do funcionáriooperation
- Tipo de alteração (por exemplo,INSERT
,UPDATE
ouDELETE
)sequenceNum
- Inteiro, identifica a ordem lógica dos eventos CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5)
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
O senhor pode visualizar esse uso de dados no seguinte SQL comando:
SELECT *
FROM mycatalog.myschema.employees_cdf
Etapa 2: Use o SCD Type 1 para manter apenas os dados mais recentes
Recomendamos usar a API APPLY CHANGES
em um pipeline do Delta Live Tables para processar um feed de dados de alteração em uma tabela SCD Tipo 1.
Criar um novo Notebook.
Cole o seguinte código nele.
A função employees_cdf
lê a tabela que acabamos de criar acima como uma transmissão porque a função APPLY CHANGES
API que o senhor usará para o processamento de captura de dados de alterações (CDC), espera uma transmissão de alterações como entrada. O senhor o envolve com um decorator @dlt.view
porque não quer materializar essa transmissão em uma tabela.
Em seguida, o senhor usa dlt.create_target_table
para criar uma tabela de transmissão que contém o resultado do processamento desse feed de dados de modificação.
Por fim, você usa dlt.apply_changes
para processar o feed de dados de alteração. Vamos dar uma olhada em cada argumento:
target
- A tabela de transmissão de destino, que o senhor definiu anteriormente.source
- O site view sobre a transmissão de registros de alterações, que o senhor definiu anteriormente.keys
- Identifica linhas exclusivas no feed de alterações. Como você está usandoid
como identificador exclusivo, basta fornecerid
como a única coluna de identificação.sequence_by
- O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. Você precisa desse sequenciamento para lidar com eventos de mudança que chegam fora de ordem. ForneçasequenceNum
como coluna de sequenciamento.apply_as_deletes
- Como os dados do exemplo contêm operações de exclusão, o senhor usaapply_as_deletes
para indicar quando um evento CDC deve ser tratado comoDELETE
em vez de upsert.except_column_list
- Contém uma lista de colunas que você não deseja incluir na tabela de destino. Neste exemplo, você usará esse argumento para excluirsequenceNum
eoperation
.stored_as_scd_type
- Indica o tipo de SCD que o senhor deseja usar.
import dlt
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dlt.view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dlt.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dlt.apply_changes(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
O senhor pode executar este pipeline clicando em começar.
Em seguida, execute a seguinte consulta no editor SQL para verificar se os registros de alteração foram processados corretamente:
SELECT *
FROM mycatalog.myschema.employees_current
Observação
A atualização fora de ordem para o funcionário Chris foi descartada corretamente, pois sua função ainda está definida como Proprietário em vez de Gerente.
Etapa 3: Use o site SCD Type 2 para manter os dados históricos
Neste exemplo, o senhor cria uma segunda tabela de destino, chamada employees_historical
, que contém um histórico completo das alterações nos registros dos funcionários.
Adicione esse código ao seu pipeline. A única diferença aqui é que stored_as_scd_type
está definido como 2 em vez de 1.
dlt.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dlt.apply_changes(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
O senhor pode executar este pipeline clicando em começar.
Em seguida, execute a seguinte consulta no editor SQL para verificar se os registros de alteração foram processados corretamente:
SELECT *
FROM mycatalog.myschema.employees_historical
Você verá todas as alterações nos funcionários, incluindo aqueles que foram excluídos, como Pat.
Etapa 4: Limpar o recurso
Quando o senhor terminar, limpe o recurso seguindo estas etapas:
Excluir o pipeline: ... nota:: Quando o senhor exclui esse pipeline, ele exclui automaticamente as tabelas
employees
eemployees_historical
. #. Clique em pipeline. #. Clique no menu kebab e, em seguida, clique em Excluir.Excluir o Notebook.
Exclua a tabela que contém o feed de dados de alteração:
Clique em Nova consulta >.
Cole e execute o seguinte código SQL, ajustando o catálogo e o esquema conforme apropriado:
DROP TABLE mycatalog.myschema.employees_cdf
Desvantagens do uso de MERGE INTO
e foreachBatch
para a captura de dados de alterações (CDC)
A Databricks fornece um comando SQL MERGE INTO
que o senhor pode usar com a API foreachBatch
API para inserir linhas em uma tabela Delta. Esta seção explora como essa técnica pode ser usada para casos de uso simples, mas esse método se torna cada vez mais complexo e frágil quando aplicado a cenários do mundo real.
Neste exemplo, você usará o mesmo feed de dados de alteração de amostra usado nos exemplos anteriores.
Implementação ingênua com MERGE INTO
e foreachBatch
Crie um Notebook e copie o seguinte código para ele. Altere as variáveis catalog
, schema
e employees_table
conforme apropriado. As variáveis catalog
e schema
devem ser definidas como locais no Unity Catalog onde o senhor pode criar tabelas.
Quando o senhor executa o Notebook, ele faz o seguinte:
Cria a tabela de destino no
create_table
. Ao contrário doapply_changes
, que processa essa etapa automaticamente, você precisa especificar o esquema.Lê o feed de dados de modificação como uma transmissão. Cada microbatch é processado usando o método
upsertToDelta
, que executa um comandoMERGE INTO
.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
Para ver os resultados, execute a seguinte consulta SQL:
SELECT *
FROM mycatalog.myschema.employees_merge
Infelizmente, os resultados estão incorretos, conforme mostrado a seguir:
Várias atualizações da mesma chave no mesmo microlote
O primeiro problema é que o código não lida com várias atualizações para o mesmo key no mesmo microbatch. Por exemplo, você usa INSERT
para inserir o funcionário Chris e, em seguida, atualizou sua função de Proprietário para Gerente. Isso deve resultar em uma linha, mas, em vez disso, há duas linhas.
Qual alteração vence quando há várias atualizações para o mesmo key em um microbatch?
A lógica se torna mais complexa. O exemplo de código a seguir recupera a última linha por sequenceNum
e mescla apenas esses dados na tabela de destino da seguinte forma:
Agrupa pelo primário key,
id
.Obtém todas as colunas da linha que tem o máximo
sequenceNum
nos lotes para esse key.Explode a fileira de volta para fora.
Atualize o método upsertToDelta
conforme mostrado a seguir e, em seguida, execute o código:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Ao consultar a tabela de destino, você vê que o funcionário chamado Chris tem a função correta, mas ainda há outros problemas a serem resolvidos porque você ainda excluiu registros que aparecem na tabela de destino.
Atualizações fora de ordem em microbatches
Esta seção explora o problema de atualizações fora de ordem em microbatches. O diagrama a seguir ilustra o problema: e se a linha do Chris tiver um UPDATE operações no primeiro microbatch seguido de um INSERT em um microbatch subsequente? O código não trata isso corretamente.
Qual alteração vence quando há atualizações fora de ordem no mesmo site key em vários microbatches?
Para corrigir isso, expanda o código para armazenar uma versão em cada linha da seguinte forma:
Armazene o
sequenceNum
quando uma linha foi atualizada pela última vez.Para cada nova linha, verifique se o carimbo de data/hora é maior do que o armazenado e, em seguida, aplique a seguinte lógica:
Se for maior, use os novos dados do alvo.
Caso contrário, mantenha os dados na fonte.
Primeiro, atualize o método createTable
para armazenar o sequenceNum
, pois você o usará para criar a versão de cada linha:
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
Em seguida, atualize upsertToDelta
para lidar com versões de linha. A cláusula UPDATE SET
de MERGE INTO
precisa lidar com cada coluna separadamente.
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Tratamento de exclusões
Infelizmente, o código ainda tem um problema. Ele não lida com operações DELETE
, como evidenciado pelo fato de que o funcionário Pat ainda está na tabela de destino.
Vamos supor que as exclusões cheguem no mesmo microlote. Para lidar com eles, atualize o método upsertToDelta
novamente para excluir a linha quando o registro de dados de alteração indicar exclusão, conforme mostrado a seguir:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Tratamento de atualizações que chegam fora de ordem após as exclusões
Infelizmente, o código acima ainda não está totalmente correto porque não trata dos casos em que um DELETE
é seguido por um UPDATE
fora de ordem nos microbatches.
O algoritmo para lidar com esse caso precisa lembrar as exclusões para que possa lidar com as atualizações subsequentes fora de ordem. Para fazer isso:
Em vez de excluir linhas imediatamente, exclua-as automaticamente com um carimbo de data/hora ou
sequenceNum
. As linhas excluídas de forma reversível são lápides.Redirecione todos os seus usuários para um site view que filtra os túmulos.
Crie um trabalho de limpeza que remova as lápides ao longo do tempo.
Use o código a seguir:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Seus usuários não podem usar a tabela de destino diretamente, portanto, crie um view que eles possam consultar:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
Por fim, crie um trabalho de limpeza que remova periodicamente as linhas com túmulos:
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY