Delta leituras e gravações de transmissão de tabelas
Esta página descreve como usar tabelas Delta como fontes e destinos para transmissãoSpark estruturada com readStream e writeStream. Delta Lake resolve problemas comuns de desempenho e confiabilidade para sistemas e arquivos de transmissão. Os benefícios incluem:
- Consolide arquivos pequenos produzidos por ingestão de baixa latência e melhore o desempenho.
- Manter processamento “exatamente uma vez” com mais de uma transmissão (ou trabalho simultâneo em lote).
- Descubra novos arquivos de forma eficiente ao usar arquivos como fonte de transmissão.
Para saber como carregar tabelas de uso de transmissão de dados no Databricks SQL, consulte Usar tabelas de transmissão no Databricks SQL.
Para transmissão-static join com Delta Lake, veja transmissão-static join.
Utilize tabelas Delta como um destino
Você pode gravar dados em uma tabela Delta usando transmissão estruturada. O log de transações Delta Lake garante o processamento exatamente uma vez, mesmo quando outras consultas de transmissão ou lotes estão sendo executadas simultaneamente na tabela.
Ao escrever em uma tabela Delta usando um sink de transmissão estruturada, você pode ver um commit vazio com epochId = -1. Estes são os casos esperados e que normalmente ocorrem:
- Nos primeiros lotes de cada execução da consulta de transmissão (isso ocorre a cada lotes para um
Trigger.AvailableNow). - Quando um esquema é alterado (como adicionar uma coluna).
Esses commits vazios são intencionais e não indicam um erro. Elas não afetam a correção ou o desempenho da consulta de forma significativa.
A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que se começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints.
Monitore o backlog com métricas.
Utilize as seguintes métricas para monitorar o backlog de um processo de consulta de transmissão:
numBytesOutstandingNúmero de bytes ainda a serem processados na fila de pendências.numFilesOutstandingNúmero de arquivos ainda a serem processados na lista de pendências.numNewListedFilesNúmero de arquivos Delta Lake listados para calcular o acúmulo de tarefas para este lote.backlogEndOffset: A versão da tabela Delta usada para calcular o backlog.
Em um Notebook, view essas métricas na tab dados brutos no painel de andamento da consulta de transmissão:
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
Modo de anexação
Por default, a transmissão é executada em modo anexado e apenas adiciona novos registros à tabela.
Utilize o método toTable ao transmitir para tabelas:
- Python
- Scala
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Modo completo
Use transmissão estruturada com modo completo para substituir a tabela inteira após cada lote. Por exemplo, você pode atualizar continuamente uma tabela de resumo agregada de eventos por cliente:
- Python
- Scala
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
Para aplicações sem requisitos de latência rigorosos, você pode economizar recursos de computação e custos com gatilhos únicos, como AvailableNow. Por exemplo, use esse gatilho para atualizar tabelas de agregação de resumo em um determinado programa, processando apenas os novos dados que chegaram desde a última atualização. Ver AvailableNow: Processamento incremental de lotes.
Gerenciar alterações nas tabelas Delta de origem
a transmissão estruturada lê tabelas Delta de forma incremental. Quando uma consulta de transmissão lê de uma tabela Delta , os novos registros são processados de forma idempotente à medida que as novas versões da tabela commit na tabela de origem. A transmissão estruturada aceita apenas entradas de anexação e lança uma exceção se ocorrerem modificações na tabela Delta de origem. Por exemplo, se uma operação UPDATE, DELETE, MERGE INTO ou OVERWRITE modificar uma tabela Delta de origem que é lida por uma consulta de transmissão, a transmissão falhará com um erro.
Existem quatro abordagens típicas para lidar com alterações upstream em tabelas Delta de origem, dependendo do seu caso de uso. Segue abaixo uma tabela de referência e detalhes sobre cada item:
Abordagem | Prós | Contras |
|---|---|---|
| Simples, não exige que você escreva uma lógica complexa. Útil para processamento somente de acréscimo, onde as alterações a montante são tratadas separadamente, ou para lidar temporariamente com um registro inválido. | Não propaga alterações e processa apenas acréscimos. |
refresh completo | Além disso, é simples e não exige que você escreva uma lógica complexa. Útil para conjuntos de dados pequenos com raras alterações a montante. | Caro para conjuntos de dados grandes. Requer o reprocessamento de todas as tabelas subsequentes. |
Alterar feed de dados | Processar todos os tipos de alteração (inserções, atualizações e exclusões). Databricks recomenda transmitir os dados a partir do feed CDC de uma tabela Delta , em vez de diretamente da tabela, sempre que possível. | Exige que você escreva uma lógica mais complexa para lidar com cada tipo de alteração. |
Visualizações materializadas | Alternativa simples à transmissão estruturada que possui propagação automática de mudanças. | Maior latência. Disponível apenas no pipeline declarativo LakeFlow Spark e Databricks SQL. |
Ignorar commit de alteração upstream com skipChangeCommits
Defina skipChangeCommits para ignorar transações que excluem ou modificam registros existentes e para processar apenas acréscimos. Isso é útil quando as alterações nos dados existentes não precisam ser propagadas pela transmissão, ou quando você prefere uma lógica separada para lidar com essas alterações. Você pode ativar e desativar skipChangeCommits se precisar ignorar temporariamente alterações pontuais.
A Databricks recomenda o uso de skipChangeCommits para a maioria das cargas de trabalho que não usam feeds de dados de alteração.
- Python
- Scala
(spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
)
spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
Se o esquema de uma tabela Delta mudar após o início de uma leitura de transmissão na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar a transmissão para resolver a incompatibilidade de esquema e continuar o processamento.
No Databricks Runtime 12.2 LTS e versões anteriores, não é possível transmitir dados de uma tabela Delta com mapeamento de colunas ativado que tenha sofrido evolução não aditiva do esquema, como renomeação ou remoção de colunas. Para obter detalhes, consulte Mapeamento e transmissão de colunas.
No Databricks Runtime 12.2 LTS e versões superiores, skipChangeCommits substitui ignoreChanges. No Databricks Runtime 11.3 LTS e versões anteriores, ignoreChanges é a única opção suportada. Consulte a opção Legado: ignoreChanges para obter detalhes.
Opção legada: ignoreDeletes
ignoreDeletes é uma opção legada que lida apenas com transações que excluem dados nos limites das partições (ou seja, exclusões completas de partições). Se você precisar lidar com exclusões, atualizações ou outras modificações que não sejam de partição, use skipChangeCommits em vez disso.
- Python
- Scala
(spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
)
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Opção legada: ignoreChanges
ignoreChanges Está disponível no Databricks Runtime 11.3 LTS e versões anteriores. No Databricks Runtime 12.2 LTS e versões superiores, ele é substituído por skipChangeCommits.
Com ignoreChanges ativado, os arquivos de dados reescritos na tabela de origem são reemitidos após operações de modificação de dados como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. Linhas inalteradas são frequentemente emitidas juntamente com novas linhas, portanto, os consumidores subsequentes devem ser capazes de lidar com duplicatas. As exclusões não são propagadas para os fluxos subsequentes. ignoreChanges tem precedência sobre ignoreDeletes.
Em contraste, skipChangeCommits ignora completamente as operações de alteração de arquivos. Os arquivos de dados reescritos na tabela de origem devido a operações de modificação de dados, como UPDATE, MERGE INTO, DELETE e OVERWRITE são totalmente ignorados. Para refletir as alterações nas tabelas de origem das transmissões, você deve implementar uma lógica separada para propagar essas alterações.
A Databricks recomenda o uso de skipChangeCommits para todas as novas cargas de trabalho. Para migrar uma carga de trabalho de ignoreChanges para skipChangeCommits, refatore sua lógica de transmissão.
refresh completa das tabelas subsequentes
Se as alterações a montante forem raras e os dados forem suficientemente pequenos para serem reprocessados, você pode excluir o ponto de verificação da transmissão e a tabela de saída e, em seguida, reiniciar a transmissão desde o início. Isso faz com que a transmissão reprocesse todos os dados da tabela de origem. Tenha em mente que essa abordagem também exige o reprocessamento de todas as tabelas subsequentes que dependem do resultado dessa transmissão.
Essa abordagem é mais adequada para conjuntos de dados ou cargas de trabalho menores, onde as alterações upstream são pouco frequentes e o custo de uma refresh completa é aceitável.
Usar o feed de dados de alterações
Para cargas de trabalho que processam todos os tipos de alterações (inserções, atualizações e exclusões), utilize o feed de dados de alterações do Delta Lake. O feed de dados de alteração registra as alterações em nível de linha em uma tabela Delta , permitindo que você transmita essas alterações e escreva a lógica para lidar com cada tipo de alteração em tabelas subsequentes. Essa é a abordagem mais robusta porque seu código lida explicitamente com todos os tipos de eventos de mudança. Consulte a seção "Usar o feed de dados de alterações do Delta Lake" no Databricks.
Se você estiver usando o pipeline declarativo LakeFlow Spark , consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline.
No Databricks Runtime 12.2 LTS e versões anteriores, não é possível transmitir dados do feed de alterações para uma tabela Delta com mapeamento de colunas ativado que tenha sofrido evolução não aditiva do esquema, como renomear ou remover colunas. Consulte Mapeamento e transmissão de colunas.
Utilizar visão materializada
A visão materializada lida automaticamente com as alterações a montante, recalculando os resultados quando os dados de origem são alterados. Se você não precisa da menor latência possível e deseja evitar a complexidade de gerenciar a transmissão, uma view materializada pode simplificar sua arquitetura. As visualizações materializadas estão disponíveis no pipeline declarativo LakeFlow Spark e no Databricks SQL. Veja Visão materializada.
Exemplo
Por exemplo, suponha que você tenha uma tabela user_events com colunas date, user_email e action particionada por date. Você sai da tabela user_events e precisa excluir dados dela devido ao GDPR.
skipChangeCommits permite excluir dados em várias partições (neste exemplo, filtrando em user_email). Utilize a seguinte sintaxe:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Se você atualizar um user_email com a instrução UPDATE, o arquivo contendo o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.
O Databricks recomenda usar skipChangeCommits em vez de ignoreDeletes a menos que você tenha certeza de que as exclusões são sempre descartes completos de partições.
Use foreachBatch para escritas de tabela idempotentes
Databricks recomenda configurar uma gravação de transmissão separada para cada coletor que você deseja atualizar em vez de usar foreachBatch. Escritas em múltiplos destinos em foreachBatch reduzem a paralelização e aumentam a latência geral porque as escritas em múltiplas tabelas são serializadas em foreachBatch.
Delta suportam as seguintes opções DataFrameWriter para tornar as gravações em várias tabelas dentro de foreachBatch idempotentes:
txnAppIdUma sequência de caracteres exclusiva que você pode passar em cada gravação DataFrame . Por exemplo, você pode usar o ID da StreamingQuery comotxnAppId.txnAppIdpode ser qualquer string única gerada pelo usuário e não precisa estar relacionada ao ID da transmissão.txnVersion: um número crescente monotonicamente que atua como versão da transação.
Delta Lake usa txnAppId e txnVersion para identificar e ignorar gravações duplicadas. Por exemplo, após uma falha interromper a escrita de lotes, você pode reexecutar os lotes com os mesmos txnAppId e txnVersion para identificar e ignorar duplicados corretamente. Consulte Usar foreachBatch para gravar em destinos de dados arbitrários.
Se o senhor excluir o ponto de verificação da transmissão e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppId diferente. Os novos pontos de controle começam com um lote ID de 0. Delta Lake usa o ID do lote e txnAppId como um único key, e ignora lotes com valores já vistos.
O exemplo de código a seguir demonstra esse padrão:
- Python
- Scala
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}
Upsert de consultas de transmissão usando foreachBatch
Você pode usar merge e foreachBatch para gravar upserts complexos de uma consulta de transmissão em uma tabela Delta . Consulte Usar foreachBatch para gravar em destinos de dados arbitrários.
Essa abordagem tem muitas aplicações:
- Melhore o desempenho de escrita com o modo de saída
update, enquanto o modo de saídacompleterequer a reescrita de toda a tabela de resultados para cada microlote. - Aplique continuamente uma transmissão de alterações a uma tabela Delta usando uma consulta merge para gravar dados de alteração em
foreachBatch. Consulte Dados de alteração lenta (SCD) e captura de dados de alterações (CDC) (CDC) com Delta Lake. - Lidar com a desduplicação durante o processamento de transmissão. Você pode usar uma consulta merge somente de inserção em
foreachBatchpara gravar dados continuamente em uma tabela Delta com deduplicação automática. Consulte a seção "Desduplicação de dados ao gravar em tabelas Delta".
-
Verifique se sua declaração
mergedentro deforeachBatché idempotente. Caso contrário, o reinício da consulta de transmissão poderá aplicar as transações nos mesmos lotes de dados diversas vezes. Consulte UseforeachBatchpara escritas de tabela idempotentes. -
Quando
mergeé usado emforeachBatch, as métricas de taxa de dados de entrada podem retornar um múltiplo da taxa real em que os dados são gerados na fonte.mergelê os dados de entrada várias vezes, o que multiplica as métricas. Para evitar a multiplicação de métricas, armazene em cache o DataFrame lotes antes demergee remova-o do cache depois demerge.A taxa de dados de entrada está disponível através de
StreamingQueryProgresse no gráfico de taxa de transmissão do Notebook. Veja consultas de monitoramento transmissão estruturada no Databricks.
Por exemplo, você pode usar instruções SQL MERGE dentro de foreachBatch:
- Scala
- Python
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Você também pode usar as APIs Delta Lake para upserts de transmissões:
- Scala
- Python
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Defina a versão inicial da tabela para processar as alterações.
Por default, as transmissões começam com a versão mais recente disponível da tabela Delta . Isso inclui um instantâneo completo da tabela naquele momento e todas as alterações futuras. Databricks recomenda que você use a versão inicial default da tabela para a maioria das cargas de trabalho.
Opcionalmente, você pode usar as seguintes opções para especificar o ponto de partida da fonte de transmissão Delta Lake sem processar a tabela inteira.
-
startingVersion: A versão da tabela Delta a partir da qual iniciar a leitura. Todas as alterações de tabela confirmadas na versão especificada ou posteriormente são lidas pela transmissão. Caso a versão especificada não esteja disponível, a transmissão não será iniciada.Para encontrar as versões commit disponíveis, execute
DESCRIBE HISTORYe verifique oversion. Para retornar apenas as alterações mais recentes, especifiquelatest. Para obter informações sobre versões da tabela Delta , consulte Trabalhar com tabela história. -
startingTimestamp: O carimbo de data/hora a partir do qual a leitura deve começar. Todas as alterações de tabela confirmadas no carimbo de data/hora especificado ou posteriormente são lidas pela transmissão. Se o carimbo de data/hora fornecido for anterior a todos os commits da tabela, a leitura da transmissão começará com o carimbo de data/hora mais antigo disponível. Defina uma das opções:- Uma sequência de carimbo de data/hora. Por exemplo,
"2019-01-01T00:00:00.000Z". - Uma string de datas. Por exemplo,
"2019-01-01".
- Uma sequência de carimbo de data/hora. Por exemplo,
Você não pode definir startingVersion e startingTimestamp ao mesmo tempo. Essas configurações se aplicam somente a novas consultas de transmissão. Se uma consulta de transmissão tiver sido iniciada e o progresso tiver sido registrado em seu ponto de verificação, essas configurações serão ignoradas.
Embora seja possível iniciar a fonte de transmissão a partir de uma versão ou carimbo de data/hora específico, o esquema da fonte de transmissão é sempre o esquema mais recente da tabela Delta . Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão ou data/hora especificada. Caso contrário, a fonte de transmissão poderá retornar resultados incorretos ao ler os dados com um esquema incorreto.
Exemplo
Por exemplo, suponha que você tenha uma tabela user_events. Se quiser ler as alterações desde a versão 5, use:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Se quiser ler as alterações desde 18-10-2018, use:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Processar o instantâneo inicial sem descartar dados
Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.
Em uma consulta de transmissão com estado e marca d'água definida, o processamento de arquivos por data de modificação pode processar registros na ordem incorreta. Isso pode fazer com que a marca d'água marque incorretamente os registros como eventos tardios e os exclua. Isso só pode ocorrer quando o instantâneo Delta inicial for processado na ordem default .
Para transmissões com uma tabela de origem Delta , a consulta primeiro processa todos os dados presentes na tabela e cria uma versão chamada Snapshot inicial . Por default, os arquivos de dados da tabela Delta são processados com base no arquivo que foi modificado por último. No entanto, a data da última modificação não representa necessariamente a ordem cronológica dos eventos registrados.
Para evitar perda de dados durante o processamento inicial do Snapshot, habilite a opção withEventTimeOrder . withEventTimeOrder divide o intervalo de tempo do evento dos dados do Snapshot inicial em intervalos de tempo. Cada microlote processa um conjunto de dados filtrando-os dentro do intervalo de tempo especificado. As opções maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho dos microlotes, mas apenas aproximadamente devido à abordagem de processamento.
O diagrama a seguir ilustra esse processo:

Restrições
- Você não pode alterar
withEventTimeOrderse a consulta de transmissão tiver começado e o Snapshot inicial estiver sendo processado ativamente. Para reiniciar comwithEventTimeOrderalterado, você deve excluir o ponto de verificação. - Se
withEventTimeOrderestiver habilitado, você não poderá fazer o downgrade de uma transmissão para uma versão Databricks Runtime que não suporte esse recurso até que o processamento inicial do Snapshot seja concluído. Para fazer o downgrade, aguarde a conclusão do Snapshot inicial ou exclua o ponto de verificação e reinicie a consulta. - Este recurso não é compatível com os seguintes cenários:
- A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
- Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
desempenho
Se withEventTimeOrder estiver ativado, o desempenho do processamento inicial do Snapshot poderá ser mais lento. Cada microlote examina o Snapshot inicial para filtrar os dados dentro do intervalo de tempo do evento correspondente. Para melhorar o desempenho da filtragem:
- Utilize uma coluna de origem Delta como o horário do evento para que a omissão de dados possa ser aplicada. Consulte Ignorando dados.
- Divida a tabela ao longo da coluna de tempo do evento.
Use a Spark UI para ver quantos arquivos Delta foram verificados para um microlote específico.
Exemplo
Suponha que você tenha uma tabela user_events com uma coluna event_time. Sua consulta de transmissão é uma consulta de agregação. Se quiser garantir que nenhum dado seja perdido durante o processamento inicial do snapshot, você pode usar:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Você pode definir withEventTimeOrder com uma configuração Spark no cluster para aplicá-lo a todas as consultas de transmissão: spark.databricks.delta.withEventTimeOrder.enabled true.
Limitar a taxa de entrada para melhorar o desempenho do processamento
Por default, a transmissão estruturada processa o maior número possível de arquivos em cada microlote. Para limitar a quantidade de dados processados por lote e gerenciar o uso de memória, estabilizar a latência ou reduzir os custos de armazenamento cloud , utilize as seguintes opções:
maxFilesPerTrigger: O número de novos arquivos a serem considerados em cada microlote. O default é 1000.maxBytesPerTriggerA quantidade de dados que é processada em cada microlote. Esta opção define um "máximo flexível", o que significa que um lote processa aproximadamente esta quantidade de dados e pode processar mais do que o limite para que a consulta de transmissão avance nos casos em que a menor unidade de entrada seja maior do que esse limite. Esta opção não está definida por default.
Se você usar maxBytesPerTrigger e maxFilesPerTrigger, o microlote processa os dados até que o limite maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.
Por default, se logRetentionDuration limpar as transações na tabela de origem e a consulta de transmissão tentar processar essas versões, a consulta falhará para evitar a perda de dados. Você pode definir a opção failOnDataLoss para false para ignorar dados perdidos e continuar o processamento. Consulte Configurar retenção de dados para consultas de viagem do tempo.
Controle os custos de armazenamento cloud
As consultas de transmissão têm vários modos de gatilho disponíveis que permitem equilibrar custo e latência, incluindo processingTime, availableNow e realTime. Consulte Controlar o custo do armazenamento cloud.