Pular para o conteúdo principal

Use o feed de dados de alterações do Delta Lake no Databricks

O feed de dados de alterações permite que o Databricks acompanhe as alterações no nível de linha entre as diferentes versões de uma tabela Delta.Quando ativado em uma tabela Delta, o tempo de execução registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados da linha, juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.

important

O feed de dados de alteração funciona em conjunto com o histórico da tabela para fornecer informações de alteração. Devido à criação de um histórico separado ao clonar uma tabela Delta, o feed de dados de alterações em tabelas clonadas não coincide com o da tabela original.

Processe dados de alteração de forma incremental

Databricks recomenda o uso do change data feed em combinação com a transmissão estruturada para processar de forma incremental as alterações das tabelas do site Delta. O senhor deve usar a transmissão estruturada para que o site Databricks rastreie automaticamente as versões do feed de dados de alterações da sua tabela.

nota

A DLT oferece funcionalidade para fácil propagação de dados de alteração e armazenamento de resultados como SCD (dimensões que mudam lentamente (SCD)) tipo 1 ou tabelas tipo 2. Consulte o site APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com DLT.

Para ler o feed de dados de alteração de uma tabela, você deve ativar o feed de dados de alteração nessa tabela. Consulte Habilitar o feed de dados de alteração.

Defina a opção readChangeFeed como true ao configurar uma transmissão em relação a uma tabela para ler o feed de dados de alteração, conforme mostrado no exemplo de sintaxe a seguir:

Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)

Em default, a transmissão retorna o Snapshot mais recente da tabela quando a transmissão começou como INSERT e as alterações futuras como dados de alteração.

Os dados de alteração são confirmados como parte da transação Delta Lake e ficam disponíveis ao mesmo tempo em que os novos dados são confirmados na tabela.

Opcionalmente, você pode especificar uma versão inicial. Consulte Devo especificar uma versão inicial?.

O feed de dados de alterações também suporta a execução de lotes, o que requer a especificação de uma versão inicial. Veja Ler alterações nas consultas de lotes.

Opções como limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex também são suportadas ao ler dados de alterações.

A limitação de taxa pode ser atômica para versões diferentes da versão de captura instantânea inicial. Ou seja, toda a versão do commit será limitada por taxa ou todo o commit será retornado.

Devo especificar uma versão inicial?

Opcionalmente, você pode especificar uma versão inicial se quiser ignorar as alterações que ocorreram antes de uma versão específica. O senhor pode especificar uma versão usando um carimbo de data/hora ou o número de ID da versão registrado no log de transações Delta.

nota

Uma versão inicial é necessária para leituras de lotes, e muitos padrões de lotes podem se beneficiar da definição de uma versão final opcional.

Quando o senhor estiver configurando cargas de trabalho de transmissão estruturada que envolvam alimentação de dados alterados, é importante entender como a especificação de uma versão inicial afeta o processamento.

Muitas cargas de trabalho de transmissão, especialmente o novo pipeline de processamento de dados, se beneficiam do comportamento do default. Com o comportamento default, o primeiro lote é processado quando a transmissão registra primeiro todos os registros existentes na tabela como INSERT operações no feed de dados de alteração.

Se sua tabela de destino já contiver todos os registros com as alterações apropriadas até certo ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como eventos INSERT.

O exemplo a seguir mostra a sintaxe da recuperação de uma falha de transmissão em que o ponto de verificação foi corrompido. Neste exemplo, suponha as seguintes condições:

  1. O feed de dados de alteração foi ativado na tabela de origem na criação da tabela.
  2. A tabela downstream de destino processou todas as alterações até a versão 75, inclusive.
  3. O histórico de versões da tabela de origem está disponível para as versões 70 e superiores.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)

Neste exemplo, você também deve especificar um novo local de ponto de verificação.

important

Se o senhor especificar uma versão inicial, a transmissão falhará ao começar de um novo ponto de verificação se a versão inicial não estiver mais presente no histórico da tabela. O Delta Lake limpa automaticamente as versões históricas, o que significa que todas as versões iniciais especificadas acabam sendo excluídas.

Consulte Posso usar o feed de dados de alteração para reproduzir todo o histórico de uma tabela?

Ler alterações nas consultas de lotes

O senhor pode usar a sintaxe de consulta de lotes para ler todas as alterações a partir de uma determinada versão ou para ler as alterações dentro de um intervalo especificado de versões.

Você especifica uma versão como um número inteiro e um timestamp como uma string no formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

As versões começar e terminar são inclusivas nas consultas. Para ler as alterações de uma determinada versão do começar para a versão mais recente da tabela, especifique apenas a versão inicial.

Se você fornecer uma versão inferior ou um carimbo de data/hora anterior a um que registrou eventos de alteração, ou seja, quando o feed de dados alterados foi ativado, será gerado um erro indicando que o feed de dados alterados não foi ativado.

Os exemplos de sintaxe a seguir demonstram o uso das opções de versão inicial e final com leituras de lotes:

SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
nota

Por padrão, se um usuário passar uma versão ou carimbo de data/hora que exceda o último commit em uma tabela, será gerado o erro timestampGreaterThanLatestCommit. No Databricks Runtime 11.3 LTSe acima, o feed de dados de alterações pode lidar com o caso de versão fora de alcance se o usuário definir a seguinte configuração para true:

SQL
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se você apresentar uma versão inicial maior que a última confirmação em uma tabela ou um carimbo de data/hora inicial mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada, será retornado um resultado de leitura vazio.

Se você fornecer uma versão final maior que o último commit em uma tabela ou um carimbo de data/hora final mais recente que o último commit em uma tabela, quando a configuração anterior estiver habilitada no modo de leitura em lote, todas as alterações entre a versão inicial e o último commit serão para ser devolvido.

Qual é o esquema para o feed de dados de alterações?

Quando você lê o feed de dados alterados de uma tabela, é utilizado o esquema da versão mais recente da tabela.

nota

A maioria das operações de alteração e evolução de esquemas é totalmente suportada. A tabela com mapeamento de colunas ativado não oferece suporte a todos os casos de uso e demonstra um comportamento diferente. Consulte Alterar as limitações do feed de dados para tabelas com o mapeamento de colunas ativado.

Além das colunas de dados do esquema da tabela Delta, o feed de dados de alteração contém colunas de metadados que identificam o tipo de evento de alteração:

Nome da coluna

Tipo

Valores

_change_type

String

insert, update_preimage , update_postimage, delete (1)

_commit_version

Long

O log Delta ou a versão da tabela que contém a alteração.

_commit_timestamp

Carimbo de data/hora

O timestamp associado quando a confirmação foi criada.

(1) preimage é o valor antes da atualização, postimage é o valor após a atualização.

nota

Você não pode ativar o feed de dados alterados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar ativar o feed de dados alterados.

Ativar alteração do feed de dados

Você só pode ler o feed de dados de alteração das tabelas habilitadas. Você deve habilitar explicitamente a opção de alteração do feed de dados usando um dos seguintes métodos:

  • Nova tabela : defina a propriedade da tabela delta.enableChangeDataFeed = true no comando CREATE TABLE.

    SQL
    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • Tabela existente : defina a propriedade da tabela delta.enableChangeDataFeed = true no comando ALTER TABLE.

    SQL
    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • Todas as novas tabelas :

    SQL
    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
important

Somente as alterações feitas após a ativação do feed de dados de alterações são registradas. As alterações anteriores em uma tabela não são capturadas.

Alterar o armazenamento de dados

Habilitar o feed de dados de alteração causa um pequeno aumento nos custos de armazenamento de uma tabela. Os registros de dados de alteração são gerados durante a execução da consulta e, em geral, são muito menores do que o tamanho total dos arquivos regravados.

O Databricks registra os dados de alteração das operações UPDATE, DELETE e MERGE na pasta _change_data no diretório da tabela. Algumas operações, como as operações somente de inserção e as exclusões de partições completas, não geram dados no diretório _change_data porque o site Databricks pode eficientemente compute o feed de dados de alteração diretamente da transação log.

Todas as leituras de arquivos de dados na pasta _change_data devem passar por APIs Delta Lake compatíveis.

Os arquivos na pasta _change_data seguem a política de retenção da tabela. Os dados de alteração do feed de dados são excluídos quando o senhor executa o comando VACUUM.

Posso usar o feed de dados de alteração para reproduzir todo o histórico de uma tabela?

O feed de dados de alterações não se destina a servir como um registro permanente de todas as alterações em uma tabela. O feed de alteração de dados registra somente as alterações que ocorrem após sua ativação.

O feed de dados de alterações e o site Delta Lake permitem que o senhor sempre reconstrua um Snapshot completo de uma tabela de origem, o que significa que é possível começar uma nova transmissão lida em uma tabela com o feed de dados de alterações ativado e capturar a versão atual dessa tabela e todas as alterações que ocorrerem depois.

Você deve tratar os registros no feed de dados de alteração como transitórios e acessíveis somente em uma janela de retenção especificada. O log de transações Delta remove versões de tabela e suas versões correspondentes de feed de dados de alteração em intervalos regulares. Quando uma versão é removida do log de transações, o senhor não pode mais ler o feed de dados de alteração dessa versão.

Se o seu caso de uso exigir a manutenção de um histórico permanente de todas as alterações em uma tabela, o senhor deve usar a lógica incremental para gravar registros do feed de dados de alteração em uma nova tabela. O exemplo de código a seguir demonstra o uso do site trigger.AvailableNow, que aproveita o processamento incremental da transmissão estruturada, mas processa os dados disponíveis como uma carga de trabalho de lotes. O senhor pode programar essa carga de trabalho de forma assíncrona com seu pipeline de processamento principal para criar um backup do feed de dados de alterações para fins de auditoria ou reprodução total.

Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)

Alterar as limitações do feed de dados para tabelas com o mapeamento de colunas ativado

Com o mapeamento de colunas habilitado em uma tabela Delta, você pode excluir ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes.Com o mapeamento de colunas ativado, o feed de dados de alterações possui limitações após a realização de alterações de esquema não aditivas, como renomear ou remover uma coluna, alterar o tipo de dados ou mudanças na nulidade.

important
  • Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.
  • Em Databricks Runtime 12.2 LTS e abaixo, as tabelas com mapeamento de coluna ativado que sofreram alterações de esquema não aditivas não suportam leituras de transmissão no feed de dados de alteração. Veja a transmissão com mapeamento de colunas e alterações no esquema.
  • Em Databricks Runtime 11.3 LTS e abaixo, o senhor não pode ler o feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram renomeação ou eliminação de coluna.

Em Databricks Runtime 12.2 LTS e acima, o senhor pode realizar leituras de lotes no feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versões especificado abranger uma alteração de esquema não aditiva.