Pular para o conteúdo principal

Use ForEachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso do site foreachBatch com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não tenha um coletor de transmissão existente.

O padrão de código streamingDF.writeStream.foreachBatch(...) permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch usam dois parâmetros:

  • Um DataFrame que tem os dados de saída de um micro-lote.
  • O ID exclusivo dos microlotes.

O senhor deve usar foreachBatch para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch.

Aplicar operações adicionais do DataFrame

Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch(), o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch() e as operações SQL MERGE INTO para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.

important
  • foreachBatch() fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar o batchId fornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.
  • foreachBatch() não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, use foreach() em vez disso.

Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir operações adequadas. Um exemplo é mostrado aqui:

Scala
  .foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()

Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0

Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:

  • print() comando write output to the driver logs.
  • Você não pode acessar o submódulo dbutils.widgets dentro da função.
  • Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.

Reutilização de lotes existentes fonte de dados

Usando o site foreachBatch(), o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:

Muitos outros lotes de fontes de dados podem ser usados em foreachBatch(). Consulte Conectar-se à fonte de dados.

Escreva em vários locais

Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.

O uso do site foreachBatch para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.

Se o senhor usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações em tabelas idempotentes em foreachBatch.