Pular para o conteúdo principal

Trabalhar com o histórico da tabela

Para tabelas Apache Iceberg e Delta Lake, cada operação que modifica uma tabela cria uma nova versão da tabela. Use a história da informação para auditar operações, reverter uma tabela ou consultar uma tabela em um ponto específico no tempo usando a viagem do tempo.

nota

O Databricks não recomenda o uso do histórico de tabelas como uma solução de backup de longo prazo para arquivamento de dados. Use apenas os últimos sete dias para operações de viagem do tempo, a menos que você tenha definido as configurações de retenção de dados e de log para um valor maior.

Recuperar história de uma tabela

Você pode recuperar informações, incluindo as operações, o usuário e o carimbo de data/hora de cada gravação em uma tabela, por meio da execução do comando DESCRIBE HISTORY. As operações são retornadas em ordem cronológica inversa.

A retenção do histórico da tabela é determinada pela configuração da tabela logRetentionDuration, que é de 30 dias por padrão.

nota

A viagem do tempo e o histórico da tabela são controlados por diferentes limites de retenção. See viagem do tempo.

SQL
DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only

Para obter detalhes de sintaxe do Spark SQL, consulte DESCRIBE HISTORY.

Para detalhes de sintaxe de Scala, Java e Python, consulte a documentação da API do Delta Lake.

Catalog Explorer mostra a história da tabela visualmente na tab **History**.

Esquema de história

A saída da operação history tem as seguintes colunas.

Coluna

Tipo

Descrição

version

long

A versão da tabela gerada pela operação.

carimbo de data/hora

timestamp

Quando essa versão foi confirmada.

userId

string

A ID do usuário que executou a operação.

userName

string

O nome do usuário que executou a operação.

operation

string

O nome da operação.

operationParameters

map

Os parâmetros da operação (por exemplo, predicados.)

Job

struct

Os detalhes do LakeFlow Job que executou a operação. É populado apenas para commits gerados por um LakeFlow Job. Caso contrário, null.

notebook

struct

Os detalhes do Notebook Databricks a partir do qual a execução da operação foi realizada. É preenchido apenas para commits escritos de um Notebook Databricks. Caso contrário, null.

clusterId

string

O ID do cluster no qual a operação foi executada.

readVersion

long

A versão da tabela que foi lida para realizar a operação de escrita.

isolationLevel

string

O nível de isolamento usado para esta operação.

isBlindAppend

boolean

Se essa operação anexou dados.

operationMetrics

map

As métricas da operação (por exemplo, número de linhas e arquivos modificados.)

userMetadata

string

Os metadados de commit definidos pelo usuário, se foram especificados.

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
nota

Compreendendo partitionBy em parâmetros de operação

O campo partitionBy na história da tabela só é significativo para operações CREATE e OVERWRITE que definem ou alteram o esquema de partição de uma tabela.

Para operações de acréscimo a tabelas existentes (APPEND, INSERT, UPDATE, DELETE, MERGE), este campo pode mostrar um array vazio [] ou colunas de partição, dependendo do método de gravação utilizado (.save() vs .saveAsTable()).

Essa inconsistência é um comportamento esperado e não afeta como os dados são gravados nas partições. Não se deve usá-lo para validar operações de anexação.

Exemplo

Considere uma tabela particionada pela coluna date. Ao criar a tabela, partitionBy é preenchido:

Python
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")

A operações CREATE no história mostra:

operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}

Quando você anexa dados a esta tabela, partitionBy mostra um array vazio:

Python
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")

A operação ANEXAR mostra:

operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}

O valor partitionBy vazio é esperado. Os dados ainda são gravados nas partições corretas com base no esquema de partição existente da tabela. Observe que .save() para um caminho pode mostrar colunas de partição neste campo, mas essa diferença é um detalhe de implementação e não afeta o comportamento de gravação.

Métricas de operações

A operação history retorna uma coleção de métricas de operações no mapa de colunas operationMetrics .

As tabelas a seguir listam as principais definições do mapa por operação.

WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO

As seguintes métricas estão disponíveis para estas operações:

Nome da métrica

Descrição

numFiles

O número de arquivos gravados.

numOutputBytes

O tamanho em bytes do conteúdo gravado.

numOutputRows

O número de linhas gravadas.

STREAMING UPDATE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numAddedFiles

O número de arquivos adicionados.

numRemovedFiles

Número de arquivos removidos.

numOutputRows

O número de linhas gravadas.

numOutputBytes

Tamanho da gravação em bytes.

DELETE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numAddedFiles

Número de arquivos adicionados. Não fornecido quando as partições da tabela são excluídas.

numRemovedFiles

Número de arquivos removidos.

numDeletedRows

Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas.

numCopiedRows

Número de linhas copiadas no processo de exclusão de arquivos.

executionTimeMs

Tempo gasto para executar toda a operação.

scanTimeMs

O tempo gasto para verificar os arquivos em busca de correspondências.

rewriteTimeMs

Tempo gasto para regravar os arquivos correspondentes.

TRUNCATE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numRemovedFiles

Número de arquivos removidos.

executionTimeMs

Tempo gasto para executar toda a operação.

MERGE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numSourceRows

O número de linhas no DataFrame de origem.

numTargetRowsInserted

O número de linhas inseridas na tabela de destino.

numTargetRowsUpdated

Número de linhas atualizadas na tabela de destino.

numTargetRowsDeleted

O número de linhas excluídas na tabela de destino.

numTargetRowsCopied

O número de linhas de destino copiadas.

numOutputRows

Número total de linhas gravadas.

numTargetFilesAdded

O número de arquivos adicionados ao coletor (destino).

numTargetFilesRemoved

O número de arquivos removidos do coletor (destino).

executionTimeMs

Tempo gasto para executar toda a operação.

scanTimeMs

O tempo gasto para verificar os arquivos em busca de correspondências.

rewriteTimeMs

Tempo gasto para regravar os arquivos correspondentes.

UPDATE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numAddedFiles

O número de arquivos adicionados.

numRemovedFiles

Número de arquivos removidos.

numUpdatedRows

O número de linhas atualizadas.

numCopiedRows

O número de linhas que acabaram de ser copiadas no processo de atualização de arquivos.

executionTimeMs

Tempo gasto para executar toda a operação.

scanTimeMs

O tempo gasto para verificar os arquivos em busca de correspondências.

rewriteTimeMs

Tempo gasto para regravar os arquivos correspondentes.

FSCK

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numRemovedFiles

Número de arquivos removidos.

CONVERT

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numConvertedFiles

O número de arquivos Parquet que foram convertidos.

OPTIMIZE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numAddedFiles

O número de arquivos adicionados.

numRemovedFiles

O número de arquivos otimizados.

numAddedBytes

O número de bytes adicionados depois que a tabela foi otimizada.

numRemovedBytes

O número de bytes removidos.

minFileSize

O tamanho do menor arquivo após a tabela ser otimizada.

p25FileSize

O tamanho do arquivo do 25º percentil após a tabela ser otimizada.

p50FileSize

O tamanho mediano do arquivo após a tabela ser otimizada.

p75FileSize

O tamanho do arquivo do 75º percentil após a tabela ser otimizada.

maxFileSize

Tamanho do maior arquivo após a tabela ser otimizada.

CLONE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

sourceTableSize

Tamanho em bytes da tabela de origem na versão clonada.

sourceNumOfFiles

Número de arquivos na tabela de origem na versão clonada.

numRemovedFiles

Número de arquivos removidos da tabela de destino se uma tabela anterior tiver sido substituída.

removedFilesSize

O tamanho total em bytes dos arquivos removidos da tabela de destino se uma tabela anterior tiver sido substituída.

numCopiedFiles

O número de arquivos que foram copiados para o novo local. 0 para clones rasos.

copiedFilesSize

O tamanho total em bytes dos arquivos que foram copiados para o novo local. 0 para clones rasos.

RESTORE

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

tableSizeAfterRestore

O tamanho da tabela em bytes após a restauração.

numOfFilesAfterRestore

Número de arquivos na tabela após a restauração.

numRemovedFiles

O número de arquivos removidos pela operação de restauração.

numRestoredFiles

Número de arquivos adicionados como resultado da restauração.

removedFilesSize

Tamanho em bytes dos arquivos removidos pela restauração.

restoredFilesSize

O tamanho em bytes dos arquivos adicionados pela restauração.

VACUUM

As seguintes métricas estão disponíveis para esta operação:

Nome da métrica

Descrição

numDeletedFiles

O número de arquivos excluídos.

numVacuumedDirectories

O número de diretórios aspirados.

numFilesToDelete

O número de arquivos a serem excluídos.

viagem do tempo

A viagem do tempo é compatível com a consulta de versões anteriores da tabela com base no carimbo de data/hora ou na versão da tabela (conforme registrado no log de transações). Você pode usar a viagem do tempo para aplicações como as seguintes:

  • Recriar análises, relatórios ou resultados, como a saída de um modelo do machine learning. Isso pode ser útil para depuração ou auditoria, especialmente em indústrias regulamentadas.
  • Escrever consultas temporais complexas.
  • Corrigir erros em seus dados.
  • Fornecer isolamento de instantâneos para um conjunto de consultas para tabelas que mudam rapidamente.
nota

No Databricks Runtime 18.0 e acima, as consultas de viagem do tempo são bloqueadas se elas solicitarem uma versão mais antiga do que a propriedade de tabela deletedFileRetentionDuration (default 7 dias). Para tabelas gerenciadas pelo Unity Catalog, isso se aplica ao Databricks Runtime 12.2 e acima.

Sintaxe da viagem do tempo

Você consulta uma tabela com viagem do tempo adicionando uma cláusula após a especificação do nome da tabela.

  • timestamp_expression pode ser qualquer um dos seguintes:

    • '2018-10-18T22:15:12.013Z', isto é, uma string que pode ser convertida em um carimbo de data/hora
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', ou seja, uma string de data
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
  • version é um valor longo que pode ser obtido da saída de DESCRIBE HISTORY table_spec.

Nem timestamp_expression nem version podem ser subconsultas.

Somente strings de data ou carimbo de data/hora são aceitas. Por exemplo, "2019-01-01" e "2019-01-01T00:00:00.000Z". Consulte o seguinte código para ver um exemplo de sintaxe:

SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Você também pode usar a sintaxe @ para especificar o carimbo de data/hora ou a versão como parte do nome da tabela. O carimbo de data/hora deve estar no formato yyyyMMddHHmmssSSS. É possível especificar uma versão com @v. Consulte o seguinte código para sintaxe de exemplo:

SQL
-- Timestamp version
SELECT * FROM people10m@20190101000000000
-- Version number
SELECT * FROM people10m@v123

Configurar a retenção de dados para consultas de viagem do tempo

Para consultar uma versão de tabela anterior, você deve reter *tanto* o log quanto os arquivos de dados para essa versão:

  • Os arquivos de dados são excluídos quando VACUUM é executado em uma tabela.
  • Os arquivos de log são removidos automaticamente após a criação de pontos de verificação das versões da tabela.

Para aumentar o limite de retenção de dados para tabelas, você deve configurar as seguintes propriedades de tabela, substituindo <format> por delta ou iceberg:

  • <format>.logRetentionDuration = "interval <interval>": controla por quanto tempo o histórico de uma tabela é mantido. O padrão é interval 30 days.

    • No Databricks Runtime 18.0 e acima, logRetentionDuration deve ser maior ou igual a deletedFileRetentionDuration. Para tabelas gerenciadas do Unity Catalog, isso se aplica ao Databricks Runtime 12.2 e acima.
  • <format>.deletedFileRetentionDuration = "interval <interval>": determina que o limite que o VACUUM utiliza para remover arquivos de dados não é mais referenciado na versão da tabela atual. O padrão é interval 7 days.

Por exemplo, para acessar 30 dias de data histórica, defina delta.deletedFileRetentionDuration = "interval 30 days", o que corresponde à configuração default para delta.logRetentionDuration.

importante

Aumentar o limite de retenção de dados pode fazer com que seus custos de armazenamento subam, pois mais arquivos de dados são mantidos.

Você pode especificar as propriedades da tabela durante a criação da tabela ou defini-las com uma instrução ALTER TABLE. Consulte Referência de propriedades da tabela.

Exemplos de viagem do tempo

Para corrigir exclusões acidentais em uma tabela para o usuário 111:

SQL
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111

Para corrigir atualizações incorretas acidentais em uma tabela:

SQL
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *

Para consultar o número de novos clientes adicionados na última semana:

SQL
SELECT
(
SELECT count(distinct userId)
FROM my_table
)
-
(
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
) AS new_customers

Pontos de verificação de log de transações

O log de transações registra as versões da tabela como arquivos JSON dentro do diretório de log de transações junto com os dados da tabela.

Para otimizar a consulta de pontos de verificação, as versões da tabela são agregadas a arquivos de ponto de verificação Parquet, o que melhora o desempenho ao evitar a necessidade de ler todas as versões JSON da história da tabela. Os usuários não precisam interagir diretamente com os pontos de verificação.

O Databricks otimiza a frequência de pontos de verificação para o tamanho dos dados e a carga de trabalho. A frequência dos pontos de verificação está sujeita a alterações sem aviso prévio.

Restaurar uma tabela para um estado anterior

Use o comando RESTORE para restaurar uma tabela para uma versão anterior ou carimbo de data/hora, incluindo para estes cenários:

  • Você pode restaurar uma tabela já restaurada.
  • Você pode restaurar uma tabela clonada.

Considere os seguintes requisitos:

  • Para restaurar uma tabela, você deve ter a permissão MODIFY para a tabela.
  • Depois que os arquivos de dados são excluídos, manualmente ou por VACUUM, você não pode restaurar uma tabela para uma versão mais antiga que faça referência a esses arquivos. A restauração para essa versão ainda é possível parcialmente se spark.sql.files.ignoreMissingFiles estiver definido como true.
  • Para restaurar por carimbo de data/hora, use os formatos yyyy-MM-dd HH:mm:ss ou yyyy-MM-dd.
SQL
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Para obter detalhes da sintaxe, consulte RESTORE.

Comportamento de transmissão

A restauração é uma operação de alteração de dados e pode resultar em dados duplicados para cargas de trabalho downstream. Entradas de log adicionadas pelo comando RESTORE contêm dataChange definido como true.

Para cargas de trabalho downstream, como um Job de transmissão estructurada que processa as atualizações em uma tabela, as entradas de log de alteração de dados adicionadas pela operação de restauração são consideradas novas atualizações de dados, e o processamento delas pode resultar em dados duplicados.

Por exemplo:

Versão da tabela

Operação

Atualizações de log

Registros em atualizações de log de alterações de dados

0

INSERT

AddFile(/path/to/file-1, dataChange = true)

(name = Viktor, age = 29), (name = George, age = 55)

1

INSERT

AddFile(/path/to/file-2, dataChange = true)

(name = George, age = 39)

2

OPTIMIZE

AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2)

Nenhum registro. A compactação OPTIMIZE não altera os dados na tabela.

3

RESTORE(version=1)

RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true)

(name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

No exemplo anterior, o comando RESTORE resulta em atualizações que foram vistas anteriormente ao ler a versão 0 e 1 da tabela. Se uma query de transmissão ler essa tabela novamente, esses arquivos serão considerados dados recém-adicionados e processados novamente.

Restaurar métricas

Após a conclusão, RESTORE informa as seguintes métricas como um DataFrame de linha única:

  • table_size_after_restore: O tamanho da tabela após a restauração.

  • num_of_files_after_restore: O número de arquivos na tabela após a restauração.

  • num_removed_files: Número de arquivos removidos (excluídos logicamente) da tabela.

  • num_restored_files: número de arquivos restaurados devido à reversão.

  • removed_files_size: Tamanho total em bytes dos arquivos removidos da tabela.

  • restored_files_size: Tamanho total em bytes dos arquivos restaurados.

    Restaurar exemplo de métricas

Localizar a última versão do commit

Para obter o número da versão do último commit gravado pelo SparkSession atual em todos os threads e todas as tabelas, consulte a configuração SQL spark.databricks.<format>.lastCommitVersionInSession. Substitua <format> por delta ou iceberg, dependendo do formato da sua tabela.

Por exemplo:

SQL
SET spark.databricks.delta.lastCommitVersionInSession

Se nenhum commit tiver sido feito pelo SparkSession, consultar a chave retornará um valor vazio.

nota

Se você compartilhar o mesmo SparkSession em vários threads, é semelhante ao compartilhamento de uma variável em vários threads. Você pode encontrar condições de corrida para atualizações concorrentes no valor da configuração.