Use foreachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso de foreachBatch com transmissão estruturada para gravar a saída de uma query de transmissão em uma fonte de dados que não possui um coletor de transmissão existente.

O padrão de código streamingDF.writeStream.foreachBatch(...) permite aplicar funções de lotes aos dados de saída de cada microlote da query de transmissão. As funções usadas com foreachBatch usam dois parâmetros:

  • Um DataFrame que contém os dados de saída de um microlote.

  • O ID exclusivo dos microlotes.

Você deve usar foreachBatch para operações merge Delta Lake em transmissão estruturada. Consulte Upsert da query de transmissão usando foreachBatch.

Aplicar operações DataFrame adicionais

Muitas operações de DataFrame e dataset não são suportadas em DataFrames de transmissão porque o Spark não suporta a geração de planos incrementais nesses casos. Usando foreachBatch() você pode aplicar algumas dessas operações em cada saída de microlotes. Por exemplo, você pode usar foreachBath() e as operações SQL MERGE INTO para gravar a saída das agregações de transmissão em uma tabela Delta em modo de atualização. Veja mais detalhes em MERGE INTO.

Importante

  • foreachBatch() fornece apenas garantias de gravação de pelo menos uma vez. No entanto, você pode usar o batchId fornecido para a função como forma de desduplicar a saída e obter uma garantia exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.

  • foreachBatch() não funciona com o modo de processamento contínuo , pois depende fundamentalmente da execução de micro-lotes de uma query transmitida. Se você gravar dados no modo contínuo, use foreach() .

Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir operações apropriadas. Um exemplo é mostrado aqui:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0

No Databricks Runtime 14.0 e acima em compute configurado com modo de acesso compartilhado, forEachBatch execução em um processo Python isolado separado no Apache Spark, em vez de no ambiente REPL. Ele é serializado e enviado ao Spark e não tem acesso aos objetos globais spark durante a duração da sessão.

Em todas as outras configurações do compute, o foreachBatch é executado no mesmo REPL do Python que executa o restante do seu código. Como resultado, a função não é serializada.

Ao usar o Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso compartilhado, o senhor deve usar a variável sparkSession com escopo para o DataFrame local ao usar foreachBatch em Python, como no exemplo de código a seguir:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

As seguintes alterações de comportamento se aplicam:

  • O senhor não pode acessar nenhuma variável global do Python a partir da sua função.

  • print() comando write output to the driver logs.

  • Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.

Reutilizar lotes existentes fonte de dados

Usando foreachBatch(), você pode usar gravadores de dados de lotes existentes para coletores de dados que podem não ter suporte para transmissão estruturada. Aqui estão alguns exemplos:

Muitos outros lotes fonte de dados podem ser usados em foreachBatch(). Consulte Conectar-se à fonte de dados.

Escreva em vários locais

Se precisar de escrever a saída de uma query de transmissão para vários locais, a Databricks recomenda a utilização de vários escritores de transmissão estruturada para melhor paralelização e taxa de transferência.

Usar foreachBatch para gravar em múltiplos sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência para cada microlote.

Se você usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações de tabelas idempotentes em foreachBatch.