Usar o feed de dados de alteração do Delta Lake no Databricks
Observação
Este artigo descreve como registrar e consultar informações de alteração no nível da linha para tabelas Delta usando o recurso de alimentação de dados de alteração. Para saber como atualizar tabelas em um Delta Live Tables pipeline com base em alterações nos dados de origem, consulte APPLY CHANGES API: Simplificar a captura de dados de alterações (CDC) em Delta Live Tables.
O feed de dados de alterações permite que o Databricks acompanhe as alterações no nível de linha entre as diferentes versões de uma tabela Delta.Quando ativado em uma tabela Delta, o tempo de execução registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados da linha, juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.
Você pode ler os eventos de alteração em consultas em lote usando Spark SQL, Apache Spark DataFrames e transmissão estruturada.
Importante
O feed de dados de alteração funciona em conjunto com o histórico da tabela para fornecer informações de alteração. Devido à criação de um histórico separado ao clonar uma tabela Delta, o feed de dados de alterações em tabelas clonadas não coincide com o da tabela original.
Casos de uso
O feed de dados de alteração não está habilitado por padrão. Os seguintes casos de uso devem ser considerados ao decidir quando habilitar o feed de dados de alterações
Tabelas Silver e Gold: melhore o desempenho do Delta Lake processando somente as alterações no nível da linha após
MERGE
UPDATE
DELETE
as operações iniciais , ou para acelerar e simplificar as operações ETL e ELT.Visualizações materializadas: crie visualizações agregadas atualizadas de informações para uso em BI e análises sem precisar reprocessar as tabelas subjacentes completas, atualizando apenas onde ocorreram mudanças.
Transmitir alterações: envie um feed de dados de alterações para sistemas downstream, como Kafka ou RDBMS, que podem usá-lo para processamento incremental em estágios posteriores dos pipelines de dados.
Tabela de trilha de auditoria: capture o feed de dados de alterações como uma tabela Delta oferece armazenamento perpétuo e capacidade de consulta eficiente para ver todas as mudanças ao longo do tempo, incluindo quando ocorrem exclusões e quais atualizações foram feitas.
Habilitar o feed de dados de alteração
Você deve habilitar explicitamente a opção de alteração de feed de dados empregando um dos seguintes métodos:
Nova tabela: definir a propriedade
delta.enableChangeDataFeed = true
da tabela no comandoCREATE TABLE
.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Tabela existente: defina a propriedade da tabela
delta.enableChangeDataFeed = true
no comandoALTER TABLE
.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Todas as novas tabela:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Importante
Apenas as alterações feitas após a ativação do feed de dados de alterações são registradas; as alterações anteriores em uma tabela não são registradas.
Alterar armazenamento de dados
O Databricks registra dados alterados para operações UPDATE
, DELETE
e MERGE
na pasta _change_data
no diretório da tabela. Algumas operações, como operações somente de inserção e exclusões de partições completas, não geram dados no diretório _change_data
porque o Databricks pode calcular com eficiência o feed de dados alterados diretamente do log de transações.
Os arquivos da pasta _change_data
seguem a política de retenção da tabela. Portanto, se você executar o comando VACUUM, os dados do feed de dados alterados também serão excluídos.
Ler alterações em consultas em lote
Você pode fornecer a versão ou o carimbo de data/hora do início e do fim. As versões inicial e final e carimbos de data/hora são incluídos nas consultas. Para ler as alterações de uma versão inicial específica para a versão mais recente da tabela, especifique apenas a versão inicial ou o carimbo de data/hora.
Você especifica uma versão como um número inteiro e um carimbo de data/hora como uma string no formato yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Se você fornecer uma versão mais baixo ou um carimbo de data/hora mais antigo do que aquele que registrou eventos de mudança, ou seja, quando o feed de dados de alterações foi habilitado, um erro será gerado indicando que o feed de dados de alterações não estava habilitado
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
Leia as alterações nas consultas de transmissão
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
Para obter os dados alterados durante a leitura da tabela, defina a opção readChangeFeed
como true
. O startingVersion
ou startingTimestamp
são opcionais e, se não forem fornecidos, o fluxo retornará o snapshot mais recente da tabela no momento do fluxo como um INSERT
e alterações futuras como dados alterados. Opções como limites de taxa (maxFilesPerTrigger
, maxBytesPerTrigger
) e excludeRegex
também são compatíveis na leitura de dados alterados.
Observação
A limitação de taxa pode ser atômica para versões diferentes da versão de captura instantânea inicial. Ou seja, toda a versão do commit será limitada por taxa ou todo o commit será retornado.
Por padrão, se um usuário passar uma versão ou carimbo de data/hora que exceda o último commit em uma tabela, será gerado o erro timestampGreaterThanLatestCommit
. No Databricks Runtime 11.3 LTSe acima, o feed de dados de alterações pode lidar com o caso de versão fora de alcance se o usuário definir a seguinte configuração para true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Se você informar uma versão de início maior do que o último commit em uma tabela ou um carimbo de data/hora de início mais recente do que o último commit em uma tabela, então, quando a configuração anterior está ativada, um resultado de leitura vazio será retornado.
Se você fornecer uma versão final maior do que o último commit em uma tabela ou um carimbo de data/hora final mais recente do que o último commit em uma tabela, então, quando a configuração anterior estiver ativada no modo de leitura em lote, todas as alterações entre a versão de início e o último commit serão recuperadas.
Qual é o esquema para o feed de dados de alteração?
Quando você lê o feed de dados alterados de uma tabela, é utilizado o esquema da versão mais recente da tabela.
Observação
A maioria das operações de alteração e evolução de esquema são totalmente suportadas. A tabela com mapeamento de coluna ativado não oferece suporte para todos os casos de uso e demonstra comportamento diferente. Consulte Alterar limitações de alimentação de dados para tabelas com mapeamento de coluna ativado.
Além das colunas de dados do esquema da tabela Delta, o feed de dados de alterações contém colunas de metadados que indicam o tipo de evento de mudança:
Nome da coluna |
Tipo |
Valores |
---|---|---|
|
String |
|
|
Long |
O log Delta ou a versão da tabela que contém a alteração. |
|
Carimbo de data/hora |
O carimbo de data/hora associado quando a confirmação foi criada. |
(1) preimage
é o valor antes da atualização, postimage
é o valor após a atualização.
Observação
Você não pode ativar o feed de dados alterados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar ativar o feed de dados alterados.
Alterar as limitações do feed de dados para tabelas com mapeamento de coluna ativado
Com o mapeamento de colunas habilitado em uma tabela Delta, você pode excluir ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes.Com o mapeamento de colunas ativado, o feed de dados de alterações possui limitações após a realização de alterações de esquema não aditivas, como renomear ou remover uma coluna, alterar o tipo de dados ou mudanças na nulidade.
Importante
Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.
Em Databricks Runtime 12.2 LTS e abaixo, as tabelas com mapeamento de coluna habilitado que sofreram alterações de esquema não aditivas não suportam leituras de transmissão no feed de dados de alteração. Veja a transmissão com mapeamento de colunas e alterações no esquema.
Em Databricks Runtime 11.3 LTS e abaixo, o senhor não pode ler o feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram renomeação ou eliminação de coluna.
Em Databricks Runtime 12.2 LTS e acima, o senhor pode realizar leituras de lotes no feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versões especificado abranger uma alteração de esquema não aditiva.
Perguntas frequentes (FAQ)
Qual é o custo indireto de habilitar o feed de dados de alteração?
Não há impacto considerável. Os registros de dados de alterações são gerados na hora durante o processo de execução da consulta e, em geral, são muito menores do que o tamanho total dos arquivos reescritos.