Delta leituras e gravações de transmissão de tabelas
Delta Lake está profundamente integrado com Spark transmissão estruturada por meio de readStream
e writeStream
. Delta Lake supera muitas das limitações normalmente associadas aos sistemas e arquivos de transmissão, inclusive:
- Coalescência de pequenos arquivos produzidos pela ingestão de baixa latência.
- Manter o processamento "exatamente uma vez" com mais de uma transmissão (ou trabalho concorrente em lote).
- Descobrir de forma eficiente quais arquivos são novos ao usar arquivos como fonte para uma transmissão.
Este artigo descreve o uso das tabelas Delta Lake como fontes e sumidouros de transmissão. Para saber como carregar tabelas de uso de dados de transmissão em Databricks SQL, consulte Load uso de dados transmission tables in Databricks SQL.
Para obter informações sobre a união estática de transmissão com Delta Lake, consulte união estática de transmissão.
Delta tabela como fonte
A transmissão estruturada lê de forma incremental as tabelas do site Delta. Enquanto uma consulta de transmissão está ativa em uma tabela Delta, novos registros são processados de forma idempotente como novas versões da tabela commit para a tabela de origem.
Os exemplos de código a seguir mostram a configuração de uma leitura de transmissão usando o nome da tabela ou o caminho do arquivo.
- Python
- Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Se o esquema de uma tabela Delta mudar após o início de uma leitura de transmissão na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar a transmissão para resolver a incompatibilidade de esquema e continuar o processamento.
Em Databricks Runtime 12.2 LTS e abaixo, o senhor não pode fazer a transmissão de uma tabela Delta com mapeamento de coluna ativado que tenha passado por uma evolução não aditiva do esquema, como renomear ou eliminar colunas. Para obter detalhes, consulte transmissão com mapeamento de coluna e alterações de esquema.
Limitar a taxa de entrada
As seguintes opções estão disponíveis para controlar micro-batches:
maxFilesPerTrigger
: quantos arquivos novos devem ser considerados em cada micro-batch. O padrão é 1000.maxBytesPerTrigger
: Quantos dados são processados em cada micro-batch. Essa opção define um "soft max", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer a consulta de transmissão avançar nos casos em que a menor unidade de entrada é maior que esse limite. Isso não é definido por padrão.
Se você utilizar o maxBytesPerTrigger
em conjunto com o maxFilesPerTrigger
, o micro-batch processará dados até que o limite de maxFilesPerTrigger
ou maxBytesPerTrigger
seja atingido.
Nos casos em que as transações da tabela de origem são limpas devido à configuração logRetentionDuration
e a consulta de transmissão tenta processar essas versões, em default a consulta falha para evitar a perda de dados. Você pode definir a opção failOnDataLoss
como false
para ignorar os dados perdidos e continuar o processamento.
transmissão para Delta Lake captura de dados de alterações (CDC) (CDC) feed
O feed de dados de alterações do Delta Lake registra as alterações em uma tabela Delta, incluindo atualizações e exclusões. Quando ativado, o senhor pode transmitir a partir de um feed de dados de alteração e escrever a lógica para processar inserções, atualizações e exclusões em tabelas downstream. Embora a saída de dados do feed de dados de alteração seja ligeiramente diferente da tabela Delta que ela descreve, isso fornece uma solução para propagar alterações incrementais para tabelas downstream em uma arquitetura de medalhão.
Em Databricks Runtime 12.2 LTS e abaixo, o senhor não pode transmitir a partir do feed de dados de alteração para uma tabela Delta com mapeamento de coluna ativado que passou por uma evolução não aditiva do esquema, como renomear ou eliminar colunas. Veja a transmissão com mapeamento de colunas e alterações no esquema.
Ignorar atualizações e exclusões
O transmissão estruturada não trata a entradas que não forem acréscimos e lança uma exceção se ocorrerem modificações na tabela que estiver sendo usada como fonte. Há duas estratégias principais para lidar com alterações que não podem ser propagadas automaticamente downstream:
- Você pode excluir a saída e o ponto de verificação e reiniciar a transmissão desde o início.
- Você pode definir uma destas duas opções:
ignoreDeletes
: ignora transações que excluem dados nos limites da partição.skipChangeCommits
: ignora transações que excluam ou modifiquem registros existentes.skipChangeCommits
subsumeignoreDeletes
.
Em Databricks Runtime 12.2 LTS e acima, skipChangeCommits
substitui a configuração anterior ignoreChanges
. No Databricks Runtime 11.3 LTS e versões inferiores, ignoreChanges
é a única opção suportada.
A semântica de ignoreChanges
difere muito de skipChangeCommits
. Com ignoreChanges
ativado, os arquivos de dados reescritos na tabela de origem são reemitidos após uma operação de alteração de dados, como UPDATE
, MERGE INTO
, DELETE
(dentro de partições) ou OVERWRITE
. As linhas inalteradas geralmente são emitidas junto com as novas linhas, portanto os consumidores downstream devem ser capazes de lidar com as duplicidades. As exclusões não são propagadas downstream. ignoreChanges
subsume ignoreDeletes
.
skipChangeCommits
ignora totalmente as operações de alteração de arquivos. Os arquivos de dados reescritos na tabela de origem devido à operação de alteração de dados como UPDATE
, MERGE INTO
, DELETE
e OVERWRITE
são ignorados completamente. Para refletir as alterações nas tabelas de origem upstream, você deve implementar lógica separada para propagar essas alterações.
As cargas de trabalho configuradas com ignoreChanges
continuam a operar usando a semântica conhecida, mas a Databricks recomenda o uso de skipChangeCommits
para todas as novas cargas de trabalho. A migração de cargas de trabalho usando ignoreChanges
para skipChangeCommits
exige uma lógica de refatoração.
Exemplo
Por exemplo, suponha que você tenha uma tabela user_events
com colunas date
, user_email
e action
particionada por date
. Você sai da tabela user_events
e precisa excluir dados dela devido ao GDPR.
Quando você exclui nos limites da partição (ou seja, o WHERE
está em uma coluna de partição), os arquivos já estão segmentados por valor, então a exclusão simplesmente remove esses arquivos dos metadados. Ao excluir uma partição inteira de dados, você pode usar o seguinte:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Se você excluir dados em várias partições (neste exemplo, filtrando em user_email
), use a seguinte sintaxe:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Se você atualizar um user_email
com a instrução UPDATE
, o arquivo contendo o user_email
em questão será reescrito. Use skipChangeCommits
para ignorar os arquivos de dados alterados.
Especifique a posição inicial
Você pode usar as opções a seguir para especificar o ponto de partida da fonte de transmissão do Delta Lake sem processar a tabela inteira.
-
startingVersion
: A versão do Delta Lake para começar. A Databricks recomenda omitir essa opção para a maioria das cargas de trabalho. Quando não definido, a transmissão começará a partir da última versão disponível, incluindo um Snapshot completo da tabela naquele momento.Se especificado, a transmissão lê todas as alterações na tabela Delta a partir da versão especificada (inclusive). Se a versão especificada não estiver mais disponível, a transmissão não começará. O senhor pode obter as versões do commit na coluna
version
da saída do comando DESCRIBE HISTORY.Para retornar somente as alterações mais recentes, especifique
latest
. -
startingTimestamp
: O timestamp de onde começar. Todas as alterações de tabela confirmadas no timestamp ou depois dele (inclusive) são lidas pelo leitor de streaming. Se o timestamp fornecido for anterior a todos os commits da tabela, a leitura de streaming começará com o timestamp mais antigo disponível. Um de:- Uma sequência de carimbo de data/hora. Por exemplo,
"2019-01-01T00:00:00.000Z"
. - Uma string de datas. Por exemplo,
"2019-01-01"
.
- Uma sequência de carimbo de data/hora. Por exemplo,
Você não pode definir as duas opções ao mesmo tempo. Elas entram em vigor somente quando o senhor inicia uma nova consulta de transmissão. Se uma consulta de transmissão tiver começado e o progresso tiver sido registrado em seu ponto de verificação, essas opções serão ignoradas.
Embora você possa iniciar a fonte de transmissão a partir de uma versão específica ou carimbo de data/hora, o esquema da fonte de transmissão é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão especificada ou o carimbo de data/hora. Caso contrário, a fonte de transmissão poderá retornar resultados incorretos ao ler os dados com um esquema incorreto.
Exemplo
Por exemplo, suponha que você tenha uma tabela user_events
. Se quiser ler as alterações desde a versão 5, use:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Se quiser ler as alterações desde 18-10-2018, use:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Processar o Snapshot inicial sem que os dados sejam descartados
Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.
Ao usar uma tabela Delta como fonte de transmissão, a consulta primeiro processa todos os dados presentes na tabela. A tabela Delta desta versão é chamada de snapshot inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.
Em uma consulta de transmissão estável com uma marca d'água definida, o processamento de arquivos por tempo de modificação pode resultar em registros processados na ordem incorreta. Isso pode fazer com que os registros sejam descartados como eventos tardios pela marca d'água.
Você pode evitar o problema de perda de dados ativando a seguinte opção:
- withEventTimeOrder: se o snapshot inicial deve ser processado com a ordem do horário do evento.
Com a ordem de tempo do evento ativada, o intervalo de tempo do evento dos dados do snapshot inicial é dividido em intervalos de tempo. Cada micro lote processa um bloco filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microbatch, mas apenas de forma aproximada devido à natureza do processamento.
O gráfico abaixo mostra esse processo:
Informações importantes sobre esse recurso:
-
O problema de perda de dados só acontece quando o snapshot Delta inicial de uma consulta de transmissão com monitoração de estado é processado na ordem padrão.
-
Você não pode alterar
withEventTimeOrder
depois que a consulta de stream é iniciada enquanto o snapshot inicial ainda estiver sendo processado. Para reiniciar comwithEventTimeOrder
alterado, você precisa excluir o ponto de verificação. -
Se você estiver executando uma consulta de fluxo com EventTimeOrder habilitado, não poderá fazer o downgrade para uma versão DBR que não ofereça suporte a esse recurso até que o processamento inicial do snapshot seja concluído. Se você precisar fazer downgrade, poderá aguardar a conclusão do snapshot inicial ou excluir o ponto de verificação e reiniciar a consulta.
-
Esse recurso não é suportado nos seguintes cenários incomuns:
- A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
- Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
-
Com a ordem de tempo do evento habilitada, o desempenho do processamento de snapshot inicial do Delta pode ser mais lento.
-
Cada micro lotes examina o Snapshot inicial para filtrar os dados dentro do intervalo de tempo do evento correspondente. Para que a ação do filtro seja mais rápida, é recomendável usar uma coluna de origem Delta como a hora do evento para que a omissão de dados possa ser aplicada (verifique a seção Omissão de dados para Delta Lake para saber quando isso é aplicável). Além disso, o particionamento da tabela ao longo da coluna de horário do evento pode acelerar ainda mais o processamento. O senhor pode consultar Spark UI para ver quantos arquivos delta são digitalizados para um micro lotes específico.
Exemplo
Suponha que você tenha uma tabela user_events
com uma coluna event_time
. Sua consulta de transmissão é uma consulta de agregação. Se quiser garantir que nenhum dado seja perdido durante o processamento inicial do snapshot, você pode usar:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Você também pode habilitar isso com a configuração Spark no cluster que se aplicará a todas as consultas de transmissão: spark.databricks.delta.withEventTimeOrder.enabled true
Mesa Delta como pia
Você também pode gravar dados em uma tabela Delta usando o a transmissão estruturada. O registro de transações permite que o Delta Lake garanta o processamento de uma única vez, mesmo se houver outros fluxos ou consultas em lote sendo executados simultaneamente na tabela.
A função Delta Lake VACUUM
remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que se começam com _
. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints
.
métricas
O senhor pode descobrir o número de bytes e o número de arquivos ainda a serem processados em um processo de consulta de transmissão, como as métricas numBytesOutstanding
e numFilesOutstanding
. As métricas adicionais incluem:
numNewListedFiles
: Número de arquivos Delta Lake que foram listados para calcular a lista de pendências desse lote.backlogEndOffset
: A versão da tabela usada para calcular o backlog.
Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
Modo de anexação
Por padrão, os fluxos são executados no modo Anexar, o que adiciona novos registros à tabela.
Use o método toTable
ao transmitir para tabelas, como no exemplo a seguir:
- Python
- Scala
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Modo completo
Você também pode usar o Structured Streaming para substituir toda a tabela por cada lote. Um exemplo de caso de uso é calcular um resumo usando agregação:
- Python
- Scala
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
O exemplo anterior atualiza continuamente uma tabela que contém o número agregado de eventos por cliente.
Para aplicativos com requisitos de latência mais leniente, você pode economizar recursos de computação com gatilhos únicos. Utilize-os para atualizar tabelas de agregação de resumo em um determinado cronograma, processando apenas os novos dados que chegaram desde a última atualização.
Upsert de consultas de transmissão usando foreachBatch
O senhor pode usar uma combinação de merge
e foreachBatch
para escrever upserts complexos de uma consulta de transmissão em uma tabela Delta. Consulte Usar ForEachBatch para gravar em coletores de dados arbitrários.
Esse padrão tem muitos aplicativos, incluindo o seguinte:
- Escrever agregados de transmissão no modo de atualização : isso é muito mais eficiente que o modo completo.
- Gravar uma transmissão das alterações do banco de dados em uma tabela Delta: A consultamerge para gravar dados de alteração pode ser usada em
foreachBatch
para aplicar continuamente uma transmissão de alterações em uma tabela Delta. - Gravar uma transmissão de dados na tabela Delta com deduplicação : A consulta merge somente de inserção para deduplicação pode ser usada em
foreachBatch
para gravar continuamente dados (com duplicatas) em uma tabela Delta com deduplicação automática.
- Faça com que sua instrução
merge
dentro deforeachBatch
esteja idempotente, pois as reinicializações da consulta de transmissão podem aplicar a operação no mesmo lote de dados várias vezes. - Quando
merge
é usado emforeachBatch
, a taxa de dados de entrada da consulta de transmissão (informada porStreamingQueryProgress
e visível no gráfico de taxa de notebook) pode ser informada como um múltiplo da taxa real em que os dados são gerados na fonte. Isso ocorre porquemerge
lê os dados de entrada várias vezes, fazendo com que as métricas de entrada sejam multiplicadas. Se isso for um gargalo, você pode armazenar em cache o DataFrame em lote antes demerge
e depois desarmazená-lo.merge
O seguinte exemplo demonstra como você pode utilizar SQL dentro do foreachBatch
para realizar esta tarefa:
- Scala
- Python
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Você também pode optar por usar as APIs do Delta Lake para executar transmissão de upserts, como no exemplo a seguir:
- Scala
- Python
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
A tabela idempotente grava em foreachBatch
Databricks recomenda a configuração de uma gravação de transmissão separada para cada coletor que o senhor deseja atualizar em vez de usar foreachBatch
. Isso ocorre porque as gravações em várias tabelas são serializadas ao usar 'forEachBatch`, o que reduz a paralelização e aumenta a latência geral.
Delta suportam as seguintes opções DataFrameWriter
para tornar as gravações em várias tabelas dentro de foreachBatch
idempotentes:
txnAppId
: Uma cadeia de caracteres exclusiva que o senhor pode passar em cada gravação em DataFrame. Por exemplo, você pode usar o ID do StreamingQuery comotxnAppId
.txnVersion
: um número crescente monotonicamente que atua como versão da transação.
O Delta Lake usa a combinação de txnAppId
e txnVersion
para identificar gravações duplicadas e ignorá-las.
Se uma gravação de lotes for interrompida por uma falha, a reexecução do lotes usará o mesmo aplicativo e a mesma ID de lotes para ajudar o tempo de execução a identificar corretamente as gravações duplicadas e ignorá-las. A ID do aplicativo (txnAppId
) pode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário e não precisa estar relacionada à ID de transmissão. Consulte Usar ForEachBatch para gravar em coletores de dados arbitrários.
Se o senhor excluir o ponto de verificação da transmissão e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppId
diferente. Os novos pontos de controle começam com um lote ID de 0
. Delta Lake usa o ID do lote e txnAppId
como um único key, e ignora lotes com valores já vistos.
O exemplo de código a seguir demonstra esse padrão:
- Python
- Scala
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}