Use ForEachBatch para gravar em coletores de dados arbitrários
Este artigo discute o uso do site foreachBatch
com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não tenha um coletor de transmissão existente.
O padrão de código streamingDF.writeStream.foreachBatch(...)
permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch
usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um micro-lote.
- O ID exclusivo dos microlotes.
O senhor deve usar foreachBatch
para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch
.
Aplicar operações adicionais do DataFrame
Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch()
, o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch()
e as operações SQL MERGE INTO
para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.
foreachBatch()
fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar obatchId
fornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.foreachBatch()
não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, useforeach()
em vez disso.
Um dataframe vazio pode ser invocado com foreachBatch()
e o código do usuário precisa ser resiliente para permitir operações adequadas. Um exemplo é mostrado aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch
no Databricks Runtime 14.0
Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:
print()
comando write output to the driver logs.- Você não pode acessar o submódulo
dbutils.widgets
dentro da função. - Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.
Reutilização de lotes existentes fonte de dados
Usando o site foreachBatch()
, o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:
Muitos outros lotes de fontes de dados podem ser usados em foreachBatch()
. Consulte Conectar-se à fonte de dados.
Escreva em vários locais
Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.
O uso do site foreachBatch
para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.
Se o senhor usar foreachBatch
para gravar em várias tabelas Delta, consulte Gravações em tabelas idempotentes em foreachBatch
.