Pular para o conteúdo principal

Leituras e gravações da tabela de transmissão do Delta Lake

Esta página descreve como usar tabelas Delta Lake como fontes e coletores para transmissão estructurada do Spark com readStream e writeStream. O Delta Lake resolve problemas comuns de desempenho e confiabilidade para sistemas e arquivos de transmissão. Os benefícios incluem:

  • Consolide arquivos pequenos produzidos por ingestão de baixa latência e melhore o desempenho.
  • Manter processamento “exatamente uma vez” com mais de uma transmissão (ou trabalho simultâneo em lote).
  • Descubra novos arquivos de forma eficiente ao usar arquivos como fonte de transmissão.

Para saber como carregar tabelas de uso de transmissão de dados no Databricks SQL, consulte Usar tabelas de transmissão autônomas.

Para transmissão-static join com Delta Lake, veja transmissão-static join.

Para uma lista completa de opções DataStreamReader e DataStreamWriter para Delta Lake, veja DataStreamReader Opções de Delta Lake e DataStreamWriter Opções de Delta Lake.

atenção

Se você usar uma tabela Delta Lake como fonte de transmissão, a consulta de transmissão deve ser executada pelo menos uma vez dentro da janela de retenção da tabela de origem. Os períodos de retenção padrão são de 7 dias para arquivos de dados removidos por VACUUMe 30 dias para o log de transações (logRetentionDuration). Se uma query ficar atrasada em relação a essas janelas, ela falha com DELTA_FILE_NOT_FOUND_DETAILED e deve ser Reset com uma atualização completa.

Não defina spark.sql.files.ignoreMissingFiles como true como uma solução alternativa, pois essa configuração produz resultados incorretos silenciosamente. Se o programador de uma transmissão não conseguir acompanhar os períodos de retenção default , aumente o período de retenção da tabela de origem.

Use tabelas Delta Lake como um coletor

É possível escrever dados em uma tabela Delta Lake usando a transmissão estructurada. O log de transações do Delta Lake garante o processamento de uma única vez, mesmo quando há outras transmissões ou consultas em lote sendo executadas simultaneamente na tabela.

Ao escrever para uma tabela Delta Lake usando um coletor de transmissão estructurada, poderá ver commits vazios com epochId = -1. São esperados e ocorrem normalmente:

  • Nos primeiros lotes de cada execução da consulta de transmissão (isso ocorre a cada lotes para um Trigger.AvailableNow).
  • Quando um esquema é alterado (como adicionar uma coluna).

Esses commits vazios são intencionais e não indicam um erro. Elas não afetam a correção ou o desempenho da consulta de forma significativa.

nota

A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que se começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta Lake usando uma estrutura de diretórios como <table-name>/_checkpoints.

Monitore o backlog com métricas.

Utilize as seguintes métricas para monitorar o backlog de um processo de consulta de transmissão:

  • numBytesOutstandingNúmero de bytes ainda a serem processados na fila de pendências.
  • numFilesOutstandingNúmero de arquivos ainda a serem processados na lista de pendências.
  • numNewListedFilesNúmero de arquivos Delta Lake listados para calcular o acúmulo de tarefas para este lote.
  • backlogEndOffset: A versão da tabela do Delta Lake usada para calcular o backlog.

Em um Notebook, view essas métricas na tab dados brutos no painel de andamento da consulta de transmissão:

JSON
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}

Modo de anexação

Por default, a transmissão é executada em modo anexado e apenas adiciona novos registros à tabela.

Utilize o método toTable ao transmitir para tabelas:

Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)

Modo completo

Use transmissão estruturada com modo completo para substituir a tabela inteira após cada lote. Por exemplo, você pode atualizar continuamente uma tabela de resumo agregada de eventos por cliente:

Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)

Para aplicações sem requisitos de latência rigorosos, você pode economizar recursos de computação e custos com gatilhos únicos, como AvailableNow. Por exemplo, use esse gatilho para atualizar tabelas de agregação de resumo em um determinado programa, processando apenas os novos dados que chegaram desde a última atualização. Ver AvailableNow: Processamento incremental de lotes.

Lidar com alterações em tabelas Delta Lake de origem

Transmissão estructurada lê incrementalmente tabelas do Delta Lake. Quando uma consulta de transmissão lê de uma tabela Delta Lake, novos registros são processados idempotentemente à medida que novas versões da tabela são confirmadas na tabela de origem. O Transmissão estruturada só aceita acréscimos e lança uma exceção se ocorrerem modificações na tabela Delta Lake de origem. Por exemplo, se uma operação UPDATE, DELETE, MERGE INTO ou OVERWRITE modificar uma tabela Delta Lake de origem que é lida por uma consulta de transmissão, a transmissão falhará com um erro.

Existem quatro abordagens típicas para lidar com alterações a montante em tabelas Delta Lake de origem, dependendo do seu caso de uso. Uma tabela de referência e os detalhes de cada um são fornecidos abaixo:

Abordagem

Prós

Contras

skipChangeCommits

Simples, não exige que você escreva uma lógica complexa. Útil para processamento somente de acréscimo, onde as alterações a montante são tratadas separadamente, ou para lidar temporariamente com um registro inválido.

Não propaga alterações e processa apenas acréscimos.

refresh completo

Além disso, é simples e não exige que você escreva uma lógica complexa. Útil para conjuntos de dados pequenos com raras alterações a montante.

Caro para conjuntos de dados grandes. Requer o reprocessamento de todas as tabelas subsequentes.

Alterar feed de dados

Processar todos os tipos de alteração (inserções, atualizações e exclusões). A Databricks recomenda transmitir a partir do feed CDC de uma tabela Delta Lake em vez de diretamente da tabela, sempre que possível.

Exige que você escreva uma lógica mais complexa para lidar com cada tipo de alteração.

Visualizações materializadas

Alternativa simples à transmissão estruturada que possui propagação automática de mudanças.

Maior latência. Disponível apenas no pipeline declarativo LakeFlow Spark e Databricks SQL.

Ignorar commit de alteração upstream com skipChangeCommits

Defina skipChangeCommits para ignorar transações que excluem ou modificam registros existentes e para processar apenas acréscimos. Isso é útil quando as alterações nos dados existentes não precisam ser propagadas pela transmissão, ou quando você prefere uma lógica separada para lidar com essas alterações. Você pode ativar e desativar skipChangeCommits se precisar ignorar temporariamente alterações pontuais.

A Databricks recomenda o uso de skipChangeCommits para a maioria das cargas de trabalho que não usam feeds de dados de alteração.

Python
(spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
)
importante

Se o esquema de uma tabela Delta Lake mudar após o início de uma leitura de transmissão na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar a transmissão para resolver a incompatibilidade de esquema e continuar o processamento.

No Databricks Runtime 12.2 LTS e versões anteriores, você não pode fazer streaming de uma tabela Delta Lake com mapeamento de coluna ativado que tenha sofrido evolução do esquema não aditiva, como renomear ou eliminar colunas. Para obter detalhes, consulte Mapeamento de colunas e transmissão.

nota

No Databricks Runtime 12.2 LTS e versões superiores, skipChangeCommits substitui ignoreChanges. No Databricks Runtime 11.3 LTS e versões anteriores, ignoreChanges é a única opção suportada. Consulte a opção Legado: ignoreChanges para obter detalhes.

Opção legada: ignoreDeletes

ignoreDeletes é uma opção legada que lida apenas com transações que excluem dados nos limites das partições (ou seja, exclusões completas de partições). Se você precisar lidar com exclusões, atualizações ou outras modificações que não sejam de partição, use skipChangeCommits em vez disso.

Python
(spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
)

Opção legada: ignoreChanges

ignoreChanges Está disponível no Databricks Runtime 11.3 LTS e versões anteriores. No Databricks Runtime 12.2 LTS e versões superiores, ele é substituído por skipChangeCommits.

Com ignoreChanges ativado, os arquivos de dados reescritos na tabela de origem são reemitidos após operações de modificação de dados como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. Linhas inalteradas são frequentemente emitidas juntamente com novas linhas, portanto, os consumidores subsequentes devem ser capazes de lidar com duplicatas. As exclusões não são propagadas para os fluxos subsequentes. ignoreChanges tem precedência sobre ignoreDeletes.

Em contraste, skipChangeCommits ignora completamente as operações de alteração de arquivos. Os arquivos de dados reescritos na tabela de origem devido a operações de modificação de dados, como UPDATE, MERGE INTO, DELETE e OVERWRITE são totalmente ignorados. Para refletir as alterações nas tabelas de origem das transmissões, você deve implementar uma lógica separada para propagar essas alterações.

A Databricks recomenda o uso de skipChangeCommits para todas as novas cargas de trabalho. Para migrar uma carga de trabalho de ignoreChanges para skipChangeCommits, refatore sua lógica de transmissão.

refresh completa das tabelas subsequentes

Se as alterações a montante forem raras e os dados forem suficientemente pequenos para serem reprocessados, você pode excluir o ponto de verificação da transmissão e a tabela de saída e, em seguida, reiniciar a transmissão desde o início. Isso faz com que a transmissão reprocesse todos os dados da tabela de origem. Tenha em mente que essa abordagem também exige o reprocessamento de todas as tabelas subsequentes que dependem do resultado dessa transmissão.

Essa abordagem é mais adequada para conjuntos de dados ou cargas de trabalho menores, onde as alterações upstream são pouco frequentes e o custo de uma refresh completa é aceitável.

Usar o feed de dados de alterações

Para cargas de trabalho que processam todos os tipos de alterações (inserções, atualizações e exclusões), use o feed de dados de alteração do Delta Lake. O feed de dados de alteração registra as alterações no nível da linha em uma tabela Delta Lake, permitindo a transmissão dessas alterações e a escrita de lógica para manipular cada tipo de alteração em tabelas downstream. Esta é a abordagem mais robusta porque o código lida explicitamente com todo tipo de evento de alteração. Consulte Usar o feed de dados de alteração no Databricks.

Se você estiver usando o pipeline declarativo LakeFlow Spark , consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline.

importante

No Databricks Runtime 12.2 LTS e versões anteriores, você não pode transmitir a partir do feed de dados de alteração para uma tabela Delta Lake com mapeamento de coluna ativado que passou por uma evolução de esquema não aditiva, como renomear ou eliminar colunas. Consulte Mapeamento de colunas e transmissão.

Utilizar visão materializada

As visualizações materializadas lidam automaticamente com as alterações a montante, recalculando os resultados quando os dados de origem são alterados. Se você não precisa da menor latência possível e deseja evitar o gerenciamento da complexidade da transmissão, uma materialized view pode simplificar sua arquitetura. As visualizações materializadas estão disponíveis em pipelines declarativos do Lakeflow Spark e em pipelines independentes. Veja Vistas materializadas.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events com colunas date, user_email e action particionada por date. Você sai da tabela user_events e precisa excluir dados dela devido ao GDPR.

skipChangeCommits permite excluir dados em várias partições (neste exemplo, filtrando em user_email). Utilize a seguinte sintaxe:

Scala
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")

Se você atualizar um user_email com a instrução UPDATE, o arquivo contendo o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

O Databricks recomenda usar skipChangeCommits em vez de ignoreDeletes a menos que você tenha certeza de que as exclusões são sempre descartes completos de partições.

Use foreachBatch para escritas de tabela idempotentes

nota

Databricks recomenda configurar uma gravação de transmissão separada para cada coletor que você deseja atualizar em vez de usar foreachBatch. Escritas em múltiplos destinos em foreachBatch reduzem a paralelização e aumentam a latência geral porque as escritas em múltiplas tabelas são serializadas em foreachBatch.

As tabelas Delta Lake oferecem suporte às seguintes opções DataFrameWriter para tornar as gravações em várias tabelas dentro de foreachBatch idempotentes:

  • txnAppIdUma sequência de caracteres exclusiva que você pode passar em cada gravação DataFrame . Por exemplo, você pode usar o ID da StreamingQuery como txnAppId. txnAppId pode ser qualquer string única gerada pelo usuário e não precisa estar relacionada ao ID da transmissão.
  • txnVersion: um número crescente monotonicamente que atua como versão da transação.

Delta Lake usa txnAppId e txnVersion para identificar e ignorar gravações duplicadas. Por exemplo, após uma falha interromper a escrita de lotes, você pode reexecutar os lotes com os mesmos txnAppId e txnVersion para identificar e ignorar duplicados corretamente. Consulte Usar foreachBatch para gravar em destinos de dados arbitrários.

atenção

Se o senhor excluir o ponto de verificação da transmissão e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppId diferente. Os novos pontos de controle começam com um lote ID de 0. Delta Lake usa o ID do lote e txnAppId como um único key, e ignora lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

Python
app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Upsert de consultas de transmissão usando foreachBatch

Você pode utilizar merge e foreachBatch para escrever upserts complexos de uma consulta de transmissão em uma tabela Delta Lake. Consulte Usar foreachBatch para gravar em coletores de dados arbitrários.

Essa abordagem tem muitas aplicações:

nota
  • Verifique se sua declaração merge dentro de foreachBatch é idempotente. Caso contrário, o reinício da consulta de transmissão poderá aplicar as transações nos mesmos lotes de dados diversas vezes. Consulte Use foreachBatch para escritas de tabela idempotentes.

  • Quando merge é usado em foreachBatch, as métricas de taxa de dados de entrada podem retornar um múltiplo da taxa real em que os dados são gerados na fonte. merge lê os dados de entrada várias vezes, o que multiplica as métricas. Para evitar a multiplicação de métricas, armazene em cache o DataFrame lotes antes de merge e remova-o do cache depois de merge.

    A taxa de dados de entrada está disponível através de StreamingQueryProgress e no gráfico de taxa de transmissão do Notebook. Veja consultas de monitoramento transmissão estruturada no Databricks.

Por exemplo, você pode usar instruções SQL MERGE dentro de foreachBatch:

Scala
// Function to upsert microBatchOutputDF into Delta Lake table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")

// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}

// Write the output of a streaming aggregation query into Delta Lake table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()

Você também pode usar as APIs Delta Lake para upserts de transmissões:

Scala
import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta Lake table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}

// Write the output of a streaming aggregation query into Delta Lake table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()

Defina a versão inicial da tabela para processar as alterações.

Por default, as transmissões começam com a versão mais recente e disponível da tabela do Delta Lake. Isso inclui um Snapshot completo da tabela naquele momento e todas as alterações futuras. O Databricks recomenda o uso da versão inicial da tabela default para a maioria das cargas de trabalho.

Opcionalmente, você pode usar as seguintes opções para especificar o ponto de partida da fonte de transmissão Delta Lake sem processar a tabela inteira.

  • startingVersion: A versão da tabela Delta Lake para começar a ler. Todas as alterações na tabela confirmadas a partir da versão especificada são lidas pela transmissão. Se a versão especificada não estiver disponível, a transmissão não pode ser iniciada.

    Para encontrar as versões de commit disponíveis, execute DESCRIBE HISTORY e verifique o version. Para retornar somente as últimas alterações, especifique latest. Para obter informações sobre as versões da tabela Delta Lake, consulte Trabalhar com o histórico da tabela.

  • startingTimestamp: O carimbo de data/hora a partir do qual a leitura deve começar. Todas as alterações de tabela confirmadas no carimbo de data/hora especificado ou posteriormente são lidas pela transmissão. Se o carimbo de data/hora fornecido for anterior a todos os commits da tabela, a leitura da transmissão começará com o carimbo de data/hora mais antigo disponível. Defina uma das opções:

    • Uma sequência de carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".
    • Uma string de datas. Por exemplo, "2019-01-01".

Você não pode definir startingVersion e startingTimestamp ao mesmo tempo. Essas configurações se aplicam somente a novas consultas de transmissão. Se uma consulta de transmissão tiver sido iniciada e o progresso tiver sido registrado em seu ponto de verificação, essas configurações serão ignoradas.

importante

Embora você possa começar a fonte de transmissão a partir de uma versão específica ou carimbo de data/hora, o esquema da fonte de transmissão é sempre o esquema mais recente da tabela Delta Lake. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta Lake após a versão especificada ou o carimbo de data/hora. Caso contrário, a fonte de transmissão poderá retornar resultados incorretos ao ler os dados com um esquema incorreto.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events. Se quiser ler as alterações desde a versão 5, use:

Scala
spark.readStream
.option("startingVersion", "5")
.table("user_events")

Se quiser ler as alterações desde 18-10-2018, use:

Scala
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")

Processar o instantâneo inicial sem descartar dados

Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.

Em uma consulta de transmissão com estado e marca d'água definida, o processamento de arquivos por data de modificação pode processar registros na ordem incorreta. Isso pode fazer com que a marca d'água marque incorretamente os registros como eventos tardios e os exclua. Isso só pode ocorrer quando o instantâneo Delta inicial for processado na ordem default .

Para transmissões que usam uma tabela Delta como origem, a consulta primeiro processa todos os dados presentes na tabela e cria uma versão chamada o *snapshot inicial*. Por default, os arquivos de dados da tabela Delta Lake são processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.

Para evitar perda de dados durante o processamento inicial do Snapshot, habilite a opção withEventTimeOrder . withEventTimeOrder divide o intervalo de tempo do evento dos dados do Snapshot inicial em intervalos de tempo. Cada microlote processa um conjunto de dados filtrando-os dentro do intervalo de tempo especificado. As opções maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho dos microlotes, mas apenas aproximadamente devido à abordagem de processamento.

O diagrama a seguir ilustra esse processo:

Snapshot inicial

Restrições

  • Você não pode alterar withEventTimeOrder se a consulta de transmissão tiver começado e o Snapshot inicial estiver sendo processado ativamente. Para reiniciar com withEventTimeOrder alterado, você deve excluir o ponto de verificação.
  • Se withEventTimeOrder estiver habilitado, você não poderá fazer o downgrade de uma transmissão para uma versão Databricks Runtime que não suporte esse recurso até que o processamento inicial do Snapshot seja concluído. Para fazer o downgrade, aguarde a conclusão do Snapshot inicial ou exclua o ponto de verificação e reinicie a consulta.
  • Este recurso não é compatível com os seguintes cenários:
    • A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.

desempenho

Se withEventTimeOrder estiver ativado, o desempenho do processamento inicial do Snapshot poderá ser mais lento. Cada microlote examina o Snapshot inicial para filtrar os dados dentro do intervalo de tempo do evento correspondente. Para melhorar o desempenho da filtragem:

  • Utilize uma coluna de origem Delta como o horário do evento para que a omissão de dados possa ser aplicada. Consulte Ignorando dados.
  • Divida a tabela ao longo da coluna de tempo do evento.

Use a Spark UI para ver quantos arquivos Delta foram verificados para um microlote específico.

Exemplo

Suponha que você tenha uma tabela user_events com uma coluna event_time. Sua consulta de transmissão é uma consulta de agregação. Se quiser garantir que nenhum dado seja perdido durante o processamento inicial do snapshot, você pode usar:

Scala
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")

Você pode definir withEventTimeOrder com uma configuração Spark no cluster para aplicá-lo a todas as consultas de transmissão: spark.databricks.delta.withEventTimeOrder.enabled true.

Limitar a taxa de entrada para melhorar o desempenho do processamento

Por default, a transmissão estruturada processa o maior número possível de arquivos em cada microlote. Para limitar a quantidade de dados processados por lote e gerenciar o uso de memória, estabilizar a latência ou reduzir os custos de armazenamento cloud , utilize as seguintes opções:

  • maxFilesPerTrigger: O número de novos arquivos a serem considerados em cada microlote. O default é 1000.
  • maxBytesPerTriggerA quantidade de dados que é processada em cada microlote. Esta opção define um "máximo flexível", o que significa que um lote processa aproximadamente esta quantidade de dados e pode processar mais do que o limite para que a consulta de transmissão avance nos casos em que a menor unidade de entrada seja maior do que esse limite. Esta opção não está definida por default.

Se você usar maxBytesPerTrigger e maxFilesPerTrigger, o microlote processa os dados até que o limite maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.

nota

Por default, se logRetentionDuration limpar as transações na tabela de origem e a consulta de transmissão tentar processar essas versões, a consulta falhará para evitar a perda de dados. Você pode definir a opção failOnDataLoss para false para ignorar dados perdidos e continuar o processamento. Consulte Configurar retenção de dados para consultas de viagem do tempo.

Controle os custos de armazenamento cloud

As consultas de transmissão têm vários modos de gatilho disponíveis que permitem equilibrar custo e latência, incluindo processingTime, availableNow e realTime. Consulte Controlar o custo do armazenamento cloud.