Pular para o conteúdo principal

Use ForEachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso do site foreachBatch com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não tenha um coletor de transmissão existente.

O padrão de código streamingDF.writeStream.foreachBatch(...) permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch usam dois parâmetros:

  • Um DataFrame que tem os dados de saída de um micro-lote.
  • O ID exclusivo dos microlotes.

O senhor deve usar foreachBatch para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch.

Aplicar operações adicionais do DataFrame

Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch(), o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch() e as operações SQL MERGE INTO para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.

important
  • foreachBatch() fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar o batchId fornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.
  • foreachBatch() não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, use foreach() em vez disso.
  • Ao usar o site foreachBatch com um operador com estado, é importante consumir completamente cada lote antes da conclusão do processamento. Ver consumir completamente cada lote DataFrame

Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir operações adequadas. Um exemplo é mostrado aqui:

Scala
  .foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()

Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0

Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:

  • print() comando write output to the driver logs.
  • Você não pode acessar o submódulo dbutils.widgets dentro da função.
  • Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.

Reutilização de lotes existentes fonte de dados

Usando o site foreachBatch(), o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:

Muitos outros lotes de fontes de dados podem ser usados em foreachBatch(). Consulte Conectar à fonte de dados e ao serviço externo.

Escreva em vários locais

Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.

O uso do site foreachBatch para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.

Se o senhor usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações em tabelas idempotentes em foreachBatch.

Consumir completamente cada lote DataFrame

Quando o senhor estiver usando operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração de lotes deverá consumir todo o DataFrame ou reiniciar a consulta. Se o senhor não consumir todo o DataFrame, a consulta de transmissão falhará com os próximos lotes.

Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.

Uso intencional de um subconjunto dos lotes

Se o senhor se preocupar apenas com um subconjunto dos lotes, poderá ter um código como o seguinte.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
batch_df.show(2)

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Nesse caso, o site batch_df.show(2) só lida com os dois primeiros itens do lote, o que é esperado, mas se houver mais itens, eles deverão ser consumidos. O código a seguir consome o DataFrame completo.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
pass

def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Aqui, a função do_nothing ignora silenciosamente o restante do DataFrame.

Tratamento de um erro em um lote

Pode haver um erro ao executar um processo foreachBatch. Você pode ter um código como o seguinte (nesse caso, a amostra gera intencionalmente um erro para mostrar o problema).

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Ao manipular (e engolir silenciosamente) o erro, o restante dos lotes pode não ser consumido. Há duas opções para lidar com essa situação.

Primeiro, o senhor pode aumentar novamente o erro, o que passa para a sua camada de orquestração tentar novamente o lote. Isso pode solucionar o erro, se for um problema transitório, ou enviá-lo para a equipe de operações tentar corrigir manualmente. Para fazer isso, altere o código partial_func para ficar assim:

Python
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue

A segunda opção, se o senhor quiser capturar a exceção e ignorar o restante dos lotes, é alterar o código para o seguinte.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
pass

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Esse código usa a função do_nothing para ignorar silenciosamente o restante dos lotes.