Pular para o conteúdo principal

Delta leituras e gravações de transmissão de tabelas

Esta página descreve como transmitir alterações de uma tabela Delta . Delta Lake está profundamente integrado com Spark transmissão estruturada através de readStream e writeStream. Delta Lake supera muitas das limitações normalmente associadas aos sistemas de transmissão e arquivos, incluindo:

  • Coalescência de pequenos arquivos produzidos pela ingestão de baixa latência.
  • Manter o processamento "exatamente uma vez" com mais de uma transmissão (ou trabalho concorrente em lote).
  • Descobrir de forma eficiente quais arquivos são novos ao usar arquivos como fonte para uma transmissão.

Para obter informações sobre a união estática de transmissão com Delta Lake, consulte união estática de transmissão.

Limitar a taxa de entrada

As seguintes opções estão disponíveis para controlar micro-batches:

  • maxFilesPerTrigger: quantos arquivos novos devem ser considerados em cada micro-batch. O padrão é 1000.
  • maxBytesPerTrigger: Quantos dados são processados em cada micro-batch. Essa opção define um "soft max", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer a consulta de transmissão avançar nos casos em que a menor unidade de entrada é maior que esse limite. Isso não é definido por padrão.

Se você utilizar o maxBytesPerTrigger em conjunto com o maxFilesPerTrigger, o micro-batch processará dados até que o limite de maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.

nota

Nos casos em que as transações da tabela de origem são limpas devido à configuração logRetentionDuration e a consulta de transmissão tenta processar essas versões, em default a consulta falha para evitar a perda de dados. Você pode definir a opção failOnDataLoss como false para ignorar os dados perdidos e continuar o processamento.

Gerenciar alterações nas tabelas Delta de origem

a transmissão estruturada lê tabelas Delta de forma incremental. Enquanto uma consulta de transmissão estiver ativa em uma tabela Delta , os novos registros serão processados de forma idempotente à medida que as novas versões da tabela commit na tabela de origem. A transmissão estruturada não processa entradas que não sejam do tipo append e lança uma exceção se ocorrerem modificações na tabela que está sendo usada como fonte. Por exemplo, se uma operação UPDATE, DELETE, MERGE INTO ou OVERWRITE modificar uma tabela Delta de origem que está sendo lida por uma consulta de transmissão, a transmissão falhará com um erro.

Existem quatro abordagens típicas para lidar com alterações upstream em tabelas Delta de origem, dependendo do seu caso de uso. Segue abaixo uma tabela de referência e detalhes sobre cada item:

Abordagem

Prós

Contras

skipChangeCommits

Simples, não exige que você escreva uma lógica complexa. Útil para processamento somente de acréscimo, onde as alterações a montante são tratadas separadamente, ou para lidar temporariamente com um registro inválido.

Não propaga alterações e processa apenas acréscimos.

refresh completo

Além disso, é simples e não exige que você escreva uma lógica complexa. Útil para conjuntos de dados pequenos com raras alterações a montante.

Caro para conjuntos de dados grandes; requer o reprocessamento de todas as tabelas subsequentes.

Alterar feed de dados

Processe todos os tipos de alteração (inserções, atualizações, exclusões); Databricks recomenda a transmissão a partir do feed CDC de uma tabela Delta , em vez de diretamente da tabela, sempre que possível.

Exige que você escreva uma lógica mais complexa para lidar com cada tipo de alteração.

Visualizações materializadas

Alternativa simples à transmissão estruturada; fornece propagação automática de alterações.

Maior latência; disponível apenas no pipeline declarativo LakeFlow Spark e Databricks SQL.

Ignorar commit de alteração upstream com skipChangeCommits

A configuração skipChangeCommits instrui o mecanismo de transmissão a ignorar transações que excluem ou modificam registros existentes e a processar apenas acréscimos. Isso é útil quando você sabe que as alterações nos dados existentes não precisam ser propagadas pela transmissão, ou quando prefere uma lógica separada para lidar com essas alterações. Você pode ativar e desativar skipChangeCommits se precisar ignorar temporariamente alterações pontuais.

A Databricks recomenda o uso de skipChangeCommits para a maioria das cargas de trabalho que não usam feeds de dados de alteração.

Python
(spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
)
importante

Se o esquema de uma tabela Delta mudar após o início de uma leitura de transmissão na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar a transmissão para resolver a incompatibilidade de esquema e continuar o processamento.

No Databricks Runtime 12.2 LTS e versões anteriores, não é possível transmitir dados de uma tabela Delta com mapeamento de colunas ativado que tenha sofrido evolução não aditiva do esquema, como renomeação ou remoção de colunas. Para obter detalhes, consulte Mapeamento e transmissão de colunas.

nota

Em Databricks Runtime 12.2 LTS e acima, skipChangeCommits substitui a configuração anterior ignoreChanges. No Databricks Runtime 11.3 LTS e versões inferiores, ignoreChanges é a única opção suportada.

A semântica de ignoreChanges difere muito de skipChangeCommits. Com ignoreChanges ativado, os arquivos de dados reescritos na tabela de origem são reemitidos após uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. As linhas inalteradas geralmente são emitidas junto com as novas linhas, portanto os consumidores downstream devem ser capazes de lidar com as duplicidades. As exclusões não são propagadas downstream. ignoreChanges subsume ignoreDeletes.

skipChangeCommits Ignora completamente as operações de alteração de arquivos. Os arquivos de dados que são reescritos na tabela de origem devido a operações de alteração de dados, como UPDATE, MERGE INTO, DELETE e OVERWRITE são completamente ignorados. Para refletir as alterações nas tabelas de origem das transmissões, você deve implementar uma lógica separada para propagar essas alterações.

As cargas de trabalho configuradas com ignoreChanges continuam a operar usando a semântica conhecida, mas a Databricks recomenda o uso de skipChangeCommits para todas as novas cargas de trabalho. A migração de cargas de trabalho usando ignoreChanges para skipChangeCommits exige uma lógica de refatoração.

Opção legada: ignoreDeletes

ignoreDeletes é uma opção legada que lida apenas com transações que excluem dados nos limites das partições (ou seja, exclusões completas de partições). Se você precisar lidar com exclusões, atualizações ou outras modificações que não sejam de partição, use skipChangeCommits em vez disso.

Python
(spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
)

refresh completa das tabelas subsequentes

Se as alterações a montante forem raras e os dados forem suficientemente pequenos para serem reprocessados, você pode excluir o ponto de verificação da transmissão e a tabela de saída e, em seguida, reiniciar a transmissão desde o início. Isso faz com que a transmissão reprocesse todos os dados da tabela de origem. Tenha em mente que essa abordagem também exige o reprocessamento de todas as tabelas subsequentes que dependem do resultado dessa transmissão.

Essa abordagem é mais adequada para conjuntos de dados ou cargas de trabalho menores, onde as alterações upstream são pouco frequentes e o custo de uma refresh completa é aceitável.

Usar o feed de dados de alterações

Para cargas de trabalho que processam todos os tipos de alterações (inserções, atualizações e exclusões), utilize o feed de dados de alterações do Delta Lake. O feed de dados de alteração registra as alterações em nível de linha em uma tabela Delta , permitindo que você transmita essas alterações e escreva a lógica para lidar com cada tipo de alteração em tabelas subsequentes. Essa é a abordagem mais robusta porque seu código lida explicitamente com todos os tipos de eventos de mudança. Consulte a seção "Usar o feed de dados de alterações do Delta Lake" no Databricks.

Se você estiver usando o pipeline declarativo LakeFlow Spark , consulte Usar APIs APPLY CHANGES para simplificar a captura de dados de alterações (CDC) com tabelas Delta Live.

importante

No Databricks Runtime 12.2 LTS e versões anteriores, não é possível transmitir dados do feed de alterações para uma tabela Delta com mapeamento de colunas ativado que tenha sofrido evolução não aditiva do esquema, como renomear ou remover colunas. Consulte Mapeamento e transmissão de colunas.

Utilizar visão materializada

A visão materializada lida automaticamente com as alterações a montante, recalculando os resultados quando os dados de origem são alterados. Se você não precisa da menor latência possível e deseja evitar a complexidade de gerenciar a transmissão, uma view materializada pode simplificar sua arquitetura. As visualizações materializadas estão disponíveis no pipeline declarativo LakeFlow Spark e no Databricks SQL. Veja Visão materializada.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events com colunas date, user_email e action particionada por date. Você sai da tabela user_events e precisa excluir dados dela devido ao GDPR.

skipChangeCommits permite excluir dados em várias partições (neste exemplo, filtrando em user_email). Utilize a seguinte sintaxe:

Scala
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")

Se você atualizar um user_email com a instrução UPDATE, o arquivo contendo o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

O Databricks recomenda usar skipChangeCommits em vez de ignoreDeletes a menos que você tenha certeza de que as exclusões são sempre descartes completos de partições.

Especifique a posição inicial

Você pode usar as opções a seguir para especificar o ponto de partida da fonte de transmissão do Delta Lake sem processar a tabela inteira.

  • startingVersionA versão do Delta Lake para começar. A Databricks recomenda omitir esta opção para a maioria das cargas de trabalho. Quando não configurado, a transmissão inicia a partir da versão mais recente disponível, incluindo um instantâneo completo da tabela naquele momento e as alterações futuras como dados de alteração. Se especificado, a transmissão lê todas as alterações na tabela Delta a partir da versão especificada (inclusive). Caso a versão especificada não esteja mais disponível, a transmissão não será iniciada. Você pode obter as versões de commit da coluna version da saída do comando DESCRIBE HISTORY . Para retornar apenas as alterações mais recentes, especifique latest.
  • startingTimestamp: O timestamp de onde começar. Todas as alterações de tabela confirmadas no timestamp ou depois dele (inclusive) são lidas pelo leitor de streaming. Se o timestamp fornecido for anterior a todos os commits da tabela, a leitura de streaming começará com o timestamp mais antigo disponível. Um de:
    • Uma sequência de carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".
    • Uma string de datas. Por exemplo, "2019-01-01".

Você não pode definir as duas opções ao mesmo tempo. Elas entram em vigor somente quando o senhor inicia uma nova consulta de transmissão. Se uma consulta de transmissão tiver começado e o progresso tiver sido registrado em seu ponto de verificação, essas opções serão ignoradas.

importante

Embora você possa iniciar a fonte de transmissão a partir de uma versão específica ou carimbo de data/hora, o esquema da fonte de transmissão é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão especificada ou o carimbo de data/hora. Caso contrário, a fonte de transmissão poderá retornar resultados incorretos ao ler os dados com um esquema incorreto.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events. Se quiser ler as alterações desde a versão 5, use:

Scala
spark.readStream
.option("startingVersion", "5")
.table("user_events")

Se quiser ler as alterações desde 18-10-2018, use:

Scala
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")

Processar o Snapshot inicial sem que os dados sejam descartados

Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.

Ao usar uma tabela Delta como fonte de transmissão, a consulta primeiro processa todos os dados presentes na tabela. A tabela Delta nessa versão é chamada de Snapshot inicial. Em default, os arquivos de dados da tabela Delta são processados com base no arquivo que foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem da hora do evento de registro.

Em uma consulta de transmissão estável com uma marca d'água definida, o processamento de arquivos por tempo de modificação pode resultar em registros processados na ordem incorreta. Isso pode fazer com que os registros sejam descartados como eventos tardios pela marca d'água.

Você pode evitar o problema de perda de dados ativando a seguinte opção:

  • withEventTimeOrder: se o snapshot inicial deve ser processado com a ordem do horário do evento.

Com a ordem de tempo do evento ativada, o intervalo de tempo do evento dos dados do snapshot inicial é dividido em intervalos de tempo. Cada micro lote processa um bloco filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microbatch, mas apenas de forma aproximada devido à natureza do processamento.

O gráfico abaixo mostra esse processo:

Snapshot inicial

Informações importantes sobre esse recurso:

  • O problema de perda de dados só acontece quando o snapshot Delta inicial de uma consulta de transmissão com monitoração de estado é processado na ordem padrão.

  • Você não pode alterar withEventTimeOrder depois que a consulta de stream é iniciada enquanto o snapshot inicial ainda estiver sendo processado. Para reiniciar com withEventTimeOrder alterado, você precisa excluir o ponto de verificação.

  • Se você estiver executando uma consulta de transmissão com o recurso `withEventTimeOrder` ativado, não poderá fazer o downgrade para uma versão Databricks Runtime que não seja compatível com esse recurso até que o processamento inicial do Snapshot seja concluído. Caso precise fazer um downgrade, você pode aguardar a conclusão do Snapshot inicial ou excluir o ponto de verificação e reiniciar a consulta.

  • Esse recurso não é suportado nos seguintes cenários incomuns:

    • A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
  • Com a ordem de tempo do evento habilitada, o desempenho do processamento de snapshot inicial do Delta pode ser mais lento.

  • Cada microlote analisa o Snapshot inicial para filtrar os dados dentro do intervalo de tempo do evento correspondente. Para uma filtragem mais rápida, recomenda-se usar uma coluna de origem Delta como horário do evento, para que a omissão de dados possa ser aplicada (consulte a seção Omissão de dados para saber quando ela se aplica). Além disso, o particionamento da tabela ao longo da coluna de tempo do evento pode acelerar ainda mais o processamento. Você pode consultar Spark UI para ver quantos arquivos delta foram verificados para um microlote específico.

Exemplo

Suponha que você tenha uma tabela user_events com uma coluna event_time. Sua consulta de transmissão é uma consulta de agregação. Se quiser garantir que nenhum dado seja perdido durante o processamento inicial do snapshot, você pode usar:

Scala
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
nota

Você também pode habilitar isso com a configuração Spark no cluster que se aplicará a todas as consultas de transmissão: spark.databricks.delta.withEventTimeOrder.enabled true

Mesa Delta como pia

Você também pode gravar dados em uma tabela Delta usando o a transmissão estruturada. O registro de transações permite que o Delta Lake garanta o processamento de uma única vez, mesmo se houver outros fluxos ou consultas em lote sendo executados simultaneamente na tabela.

Ao escrever em uma tabela de transações ( Delta ) utilizando um sink de estrutura de transmissão, é possível observar um commit vazio com epochId = -1. Eles são esperados e normalmente ocorrem:

  • Nos primeiros lotes de cada execução da consulta de transmissão (isso ocorre a cada lotes para um Trigger.AvailableNow).
  • Quando um esquema é alterado (como adicionar uma coluna).

Esses commits vazios não afetam a correção ou o desempenho da consulta de forma significativa. Eles são intencionais e não indicam um erro.

nota

A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que se começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints.

métricas

O senhor pode descobrir o número de bytes e o número de arquivos ainda a serem processados em um processo de consulta de transmissão, como as métricas numBytesOutstanding e numFilesOutstanding. As métricas adicionais incluem:

  • numNewListedFiles: Número de arquivos Delta Lake que foram listados para calcular a lista de pendências desse lote.
    • backlogEndOffset: A versão da tabela usada para calcular o backlog.

Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:

JSON
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}

Modo de anexação

Por padrão, os fluxos são executados no modo Anexar, o que adiciona novos registros à tabela.

Use o método toTable ao transmitir para tabelas, como no exemplo a seguir:

Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)

Modo completo

Você também pode usar o Structured Streaming para substituir toda a tabela por cada lote. Um exemplo de caso de uso é calcular um resumo usando agregação:

Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)

O exemplo anterior atualiza continuamente uma tabela que contém o número agregado de eventos por cliente.

Para aplicativos com requisitos de latência mais leniente, você pode economizar recursos de computação com gatilhos únicos. Utilize-os para atualizar tabelas de agregação de resumo em um determinado cronograma, processando apenas os novos dados que chegaram desde a última atualização.

Upsert de consultas de transmissão usando foreachBatch

O senhor pode usar uma combinação de merge e foreachBatch para escrever upserts complexos de uma consulta de transmissão em uma tabela Delta. Consulte Usar ForEachBatch para gravar em coletores de dados arbitrários.

Esse padrão tem muitos aplicativos, incluindo o seguinte:

  • Escrever agregados de transmissão no modo de atualização : isso é muito mais eficiente que o modo completo.
  • Gravar uma transmissão das alterações do banco de dados em uma tabela Delta: A consultamerge para gravar dados de alteração pode ser usada em foreachBatch para aplicar continuamente uma transmissão de alterações em uma tabela Delta.
  • Gravar uma transmissão de dados na tabela Delta com deduplicação : A consulta merge somente de inserção para deduplicação pode ser usada em foreachBatch para gravar continuamente dados (com duplicatas) em uma tabela Delta com deduplicação automática.
nota
  • Faça com que sua instrução merge dentro de foreachBatch esteja idempotente, pois as reinicializações da consulta de transmissão podem aplicar a operação no mesmo lote de dados várias vezes.
  • Quando merge é usado em foreachBatch, a taxa de dados de entrada da consulta de transmissão (informada por StreamingQueryProgress e visível no gráfico de taxa de notebook) pode ser informada como um múltiplo da taxa real em que os dados são gerados na fonte. Isso ocorre porque merge lê os dados de entrada várias vezes, fazendo com que as métricas de entrada sejam multiplicadas. Se isso for um gargalo, você pode armazenar em cache o DataFrame em lote antes de merge e depois desarmazená-lo. merge

O seguinte exemplo demonstra como você pode utilizar SQL dentro do foreachBatch para realizar esta tarefa:

Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")

// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()

Você também pode optar por usar as APIs do Delta Lake para executar transmissão de upserts, como no exemplo a seguir:

Scala
import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()

A tabela idempotente grava em foreachBatch

nota

Databricks Recomenda configurar uma gravação de transmissão separada para cada sink que o senhor deseja atualizar, em vez de usar foreachBatch. Isso ocorre porque as gravações em várias tabelas são serializadas ao usar 'forEachBatch`, o que reduz a paralelização e aumenta a latência geral.

Delta suportam as seguintes opções DataFrameWriter para tornar as gravações em várias tabelas dentro de foreachBatch idempotentes:

  • txnAppId: Uma cadeia de caracteres exclusiva que o senhor pode passar em cada gravação em DataFrame. Por exemplo, você pode usar o ID do StreamingQuery como txnAppId.
  • txnVersion: um número crescente monotonicamente que atua como versão da transação.

O Delta Lake usa a combinação de txnAppId e txnVersion para identificar gravações duplicadas e ignorá-las.

Se uma gravação de lotes for interrompida por uma falha, a reexecução do lotes usará o mesmo aplicativo e a mesma ID de lotes para ajudar o tempo de execução a identificar corretamente as gravações duplicadas e ignorá-las. A ID do aplicativo (txnAppId) pode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário e não precisa estar relacionada à ID de transmissão. Consulte Usar ForEachBatch para gravar em coletores de dados arbitrários.

atenção

Se o senhor excluir o ponto de verificação da transmissão e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppId diferente. Os novos pontos de controle começam com um lote ID de 0. Delta Lake usa o ID do lote e txnAppId como um único key, e ignora lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

Python
app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()