Use o feed de dados de alterações do Delta Lake no Databricks
O feed de dados de alterações permite que o Databricks acompanhe as alterações no nível de linha entre as diferentes versões de uma tabela Delta.Quando ativado em uma tabela Delta, o tempo de execução registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados da linha, juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.
Você pode usar o feed de dados de alterações para viabilizar casos de uso de dados comuns, incluindo:
- PipelineETL : Processa incrementalmente apenas as linhas que foram alteradas desde a última execução pipeline .
- Trilhas de auditoria : rastreiam as modificações de dados para atender aos requisitos de compliance e governança.
- Replicação de dados : Sincroniza as alterações com tabelas, caches ou sistemas externos subsequentes.
O feed de dados de alterações funciona em conjunto com o histórico da tabela para fornecer informações sobre alterações. Como a clonagem de uma tabela Delta cria um histórico separado, o feed de dados de alterações nas tabelas clonadas não corresponde ao da tabela original.
Ativar alteração do feed de dados
O feed de dados de alteração deve ser explicitamente ativado nas tabelas das quais você deseja ler os dados. Utilize um dos seguintes métodos.
Nova tabela
Defina a propriedade da tabela delta.enableChangeDataFeed = true no comando CREATE TABLE .
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Tabela existente
Defina a propriedade da tabela delta.enableChangeDataFeed = true no comando ALTER TABLE .
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Todas as novas mesas em uma sessão
Configure o Spark para ativar o feed de dados de alterações para todas as novas tabelas criadas em uma sessão.
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Somente as alterações feitas após a ativação do feed de dados de alterações são registradas. As alterações anteriores em uma tabela não são capturadas.
Alterar esquema de alimentação de dados
Ao ler os dados de alteração de uma tabela, o esquema da versão mais recente da tabela é utilizado. O Databricks oferece suporte completo à maioria das operações de alteração e evolução de esquema, mas as tabelas com mapeamento de colunas ativado apresentam limitações. Consulte Alterar as limitações do feed de dados para tabelas com mapeamento de colunas.
Além das colunas de dados do esquema da tabela Delta, o feed de dados de alteração contém colunas de metadados que identificam o tipo de evento de alteração:
Nome da coluna | Tipo | Valores |
|---|---|---|
| String |
|
| Long | O log Delta ou a versão da tabela que contém a alteração. |
| Carimbo de data/hora | O timestamp associado quando a confirmação foi criada. |
(1) preimage é o valor antes da atualização, postimage é o valor após a atualização.
Não é possível ativar o feed de dados de alteração em uma tabela se o esquema contiver colunas com os mesmos nomes que essas colunas de metadados. Renomeie as colunas da sua tabela para resolver esse conflito antes de ativar o feed de dados de alteração.
Processe dados de alteração de forma incremental
Databricks recomenda o uso do feed de dados de alteração em combinação com a transmissão estruturada para processar incrementalmente as alterações das tabelas Delta . Você deve usar a transmissão estruturada do Databricks para rastrear automaticamente as versões do feed de dados de alterações da sua tabela. Para processamento CDC com tabelas SCD tipo 1 ou tipo 2, consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline.
Defina a opção readChangeFeed como true ao configurar uma transmissão em relação a uma tabela para ler o feed de dados de alteração, conforme mostrado no exemplo de sintaxe a seguir:
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
comportamento padrão
Quando a transmissão começa pela primeira vez, ela retorna o Snapshot mais recente da tabela como INSERT registros e, em seguida, retorna as alterações futuras como dados de alteração. A alteração nos dados é confirmada como parte da transação Delta Lake e fica disponível ao mesmo tempo que os novos dados são confirmados na tabela.
Opções adicionais
Você pode, opcionalmente, especificar uma versão inicial (consulte Especificar uma versão inicial) ou usar a execução em lotes (consulte Ler alterações em consultas em lotes). O Databricks também suporta limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex ao ler dados de alteração.
Para versões diferentes do Snapshot inicial, a limitação de taxa se aplica atomicamente a todo o commit — ou o commit inteiro é incluído nos lotes atuais, ou é adiado para os próximos lotes.
Especifique uma versão inicial
Para ler as alterações a partir de um ponto específico, especifique uma versão inicial usando um carimbo de data/hora ou um número de versão. Versões iniciais são necessárias para a leitura de lotes. Opcionalmente, você pode especificar uma versão final para limitar o intervalo. Para saber mais sobre a história da mesa Delta Lake , veja O que é Delta Lake viagem do tempo?.
Ao configurar cargas de trabalho de transmissão estruturada que envolvem o feed de dados de alteração, entenda como a especificação de uma versão inicial impacta o processamento:
- Os novos pipelines de processamento de dados normalmente se beneficiam do comportamento default , que registra todos os registros existentes na tabela como
INSERToperações quando a transmissão começa pela primeira vez. - Se sua tabela de destino já contiver todos os registros com as alterações apropriadas até certo ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como eventos
INSERT.
O exemplo a seguir mostra a sintaxe para recuperação de uma falha de transmissão na qual o ponto de verificação foi corrompido. Neste exemplo, considere as seguintes condições:
- O feed de dados de alteração foi ativado na tabela de origem na criação da tabela.
- A tabela downstream de destino processou todas as alterações até a versão 75, inclusive.
- O histórico de versões da tabela de origem está disponível para as versões 70 e superiores.
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
Neste exemplo, você também deve especificar um novo local de ponto de verificação.
Se o senhor especificar uma versão inicial, a transmissão falhará ao começar de um novo ponto de verificação se a versão inicial não estiver mais presente no histórico da tabela. O Delta Lake limpa automaticamente as versões históricas, o que significa que todas as versões iniciais especificadas acabam sendo excluídas.
Veja o histórico da tabela de replay.
Ler alterações nas consultas de lotes
O senhor pode usar a sintaxe de consulta de lotes para ler todas as alterações a partir de uma determinada versão ou para ler as alterações dentro de um intervalo especificado de versões.
- Especifique as versões como números inteiros e os carimbos de data/hora como strings no formato
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - As versões "iniciar" e "finalizar" são inclusivas.
- Para ler desde a versão inicial até a versão mais recente, especifique apenas a versão inicial.
- Especificar uma versão anterior à ativação do feed de dados de alterações gera um erro.
Os exemplos de sintaxe a seguir demonstram o uso das opções de versão inicial e final com leituras de lotes:
- SQL
- Python
- Scala
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name,
-- with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Lidar com versões fora de alcance
Por default, especificar uma versão ou timestamp que exceda o último commit gera o erro timestampGreaterThanLatestCommit. No Databricks Runtime 11.3 LTS e versões superiores, você pode habilitar a tolerância para versões fora do intervalo:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Com essa configuração ativada:
- iniciar versão/carimbo de data/hora além do último commit : Retorna um resultado vazio.
- Versão final/carimbo de data/hora posterior ao último commit : Retorna todas as alterações desde o início até o último commit.
Alterações nos dados de registro
Delta Lake regista alterações de dados de forma eficiente e pode utilizar outros recursos Delta Lake para otimizar a representação do armazenamento.
Considerações sobre armazenamento
- Custos de armazenamento : Habilitar o feed de dados de alterações pode causar um pequeno aumento nos custos de armazenamento, pois as alterações podem ser registradas em arquivos separados.
- Operações sem arquivos de alterações : Algumas operações (somente inserção, exclusões de partição inteira) não geram arquivos de dados de alterações —Databricks calcula o fluxo de dados de alterações diretamente do log de transações.
- Retenção : Os arquivos de dados alterados seguem a política de retenção da tabela. O comando
VACUUMos exclui, e as alterações do log de transações seguem a retenção do ponto de verificação.
Não tente reconstruir o fluxo de dados de alterações consultando diretamente os arquivos de dados de alterações. Utilize sempre as APIs do Delta Lake.
Tabela de reprodução
O feed de dados de alterações não se destina a servir como um registro permanente de todas as alterações feitas em uma tabela. Ele registra apenas as alterações que ocorrem após ser ativado, e você pode iniciar uma nova transmissão para capturar a versão atual e todas as alterações subsequentes.
Os registros no feed de dados de alterações são transitórios e acessíveis apenas durante um período de retenção específico. O log de transações do Delta Lake remove versões de tabelas e suas respectivas versões de feed de dados de alterações em intervalos regulares. Quando uma versão é removida, você não poderá mais ler o feed de dados de alterações dessa versão.
Arquivar dados de alteração para história permanente
Se o seu caso de uso exigir a manutenção de um histórico permanente de todas as alterações em uma tabela, use lógica incremental para gravar os registros do fluxo de dados de alterações em uma nova tabela. O exemplo a seguir demonstra o uso de trigger.AvailableNow para processar dados disponíveis como uma carga de trabalho de lotes para auditoria ou reprodução completa:
- Python
- Scala
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Alterar as limitações de alimentação de dados para tabelas com mapeamento de colunas.
Com o mapeamento de colunas ativado em uma tabela Delta, você pode excluir ou renomear colunas sem precisar sobrescrever os arquivos de dados. No entanto, o feed de dados de alteração apresenta limitações após alterações de esquema não aditivas, como renomear ou remover colunas, alterar tipos de dados ou alterações de nulidade:
- Semântica de lotes : Não é possível ler o feed de dados de alterações para uma transação ou intervalo em que ocorre uma alteração de esquema não aditiva.
- DBR 12.2 LTS e versões anteriores : Tabelas com mapeamento de colunas ativado que sofreram alterações de esquema não aditivas não suportam leituras de transmissão no feed de dados de alteração. Veja transmissão com mapeamento de colunas e alterações de esquema.
- DBR 11.3 LTS e versões anteriores : Não é possível ler o feed de dados de alterações para tabelas com mapeamento de colunas ativado que sofreram renomeação ou remoção de colunas.
No Databricks Runtime 12.2 LTS e versões superiores, você pode realizar leituras em lote no feed de dados de alterações para tabelas com mapeamento de colunas ativado que sofreram alterações de esquema não aditivas. As operações de leitura utilizam o esquema da versão final especificada na consulta, em vez da versão mais recente da tabela. As consultas ainda falham se o intervalo de versões abranger uma alteração de esquema não aditiva.