Pular para o conteúdo principal

Use ForEachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso do site foreachBatch com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não tenha um coletor de transmissão existente.

O padrão de código streamingDF.writeStream.foreachBatch(...) permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch usam dois parâmetros:

  • Um DataFrame que tem os dados de saída de um micro-lote.
  • O ID exclusivo dos microlotes.

O senhor deve usar foreachBatch para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch.

Aplicar operações adicionais do DataFrame

Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch(), o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch() e as operações SQL MERGE INTO para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.

importante
  • foreachBatch() fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar o batchId fornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.
  • foreachBatch() não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, use foreach() em vez disso.
  • Ao usar o site foreachBatch com um operador com estado, é importante consumir completamente cada lote antes da conclusão do processamento. Ver consumir completamente cada lote DataFrame

Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir operações adequadas. Um exemplo é mostrado aqui:

Scala
  .foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()

Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0

Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:

  • print() comando write output to the driver logs.
  • Você não pode acessar o submódulo dbutils.widgets dentro da função.
  • Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.

Reutilização de lotes existentes fonte de dados

Usando o site foreachBatch(), o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:

Muitos outros lotes de fontes de dados podem ser usados em foreachBatch(). Consulte Conectar à fonte de dados e ao serviço externo.

Escreva em vários locais

Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.

O uso do site foreachBatch para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.

Se o senhor usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações em tabelas idempotentes em foreachBatch.

Consumir completamente cada lote DataFrame

Quando o senhor estiver usando operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração de lotes deverá consumir todo o DataFrame ou reiniciar a consulta. Se o senhor não consumir todo o DataFrame, a consulta de transmissão falhará com os próximos lotes.

Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.

Uso intencional de um subconjunto dos lotes

Se o senhor se preocupar apenas com um subconjunto dos lotes, poderá ter um código como o seguinte.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
batch_df.show(2)

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Nesse caso, o site batch_df.show(2) só lida com os dois primeiros itens do lote, o que é esperado, mas se houver mais itens, eles deverão ser consumidos. O código a seguir consome o DataFrame completo.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
pass

def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Aqui, a função do_nothing ignora silenciosamente o restante do DataFrame.

Tratamento de um erro em um lote

Para tratamento de erros em foreachBatch, Databricks recomenda que você permita que a consulta de transmissão falhe rapidamente e, em vez disso, confie na camada de orquestração, como LakeFlow Jobs ou Apache Airflow, para gerenciar a lógica de repetição. Isso é muito mais seguro do que criar loops de repetição complexos em seu código, onde pode ocorrer perda de dados.

Aqui estão algumas diretrizes com base no seu objetivo de escrita:

Destino

Exemplos

Orientação

Operações DataFrame

Tabelas Delta Lake

Você deve usar as opções de escrita txnAppId e txnVersion , vinculando txnVersion a batchId, para garantir a idempotência e proteger a correção dos dados em novas tentativas. Não capture e tente novamente exceções localmente. Em vez disso, Databricks recomenda que você permita que os erros se propaguem para que as métricas Spark permaneçam precisas, os dados não sejam duplicados e o orquestrador possa tentar novamente os lotes completos sem problemas.

Código personalizado e destinos externos

.collect()Bancos de dados OLTP, filas de mensagens, APIs

Implemente sua própria idempotência. Você deve assumir que quaisquer operações podem e serão repetidas em lotes diferentes. Se o batchId permanecer o mesmo, o resultado de suas operações deverá permanecer o mesmo. Você pode tentar novamente erros puramente transitórios, como breves timeouts de conexão, mas tome extremo cuidado para evitar gravações parciais ou duplicadas caso a nova tentativa falhe. A abordagem mais segura é deixar os erros se propagarem e permitir que o orquestrador tente novamente todos os lotes.

Aqui estão alguns exemplos de tipos de exceção e recomendações sobre como lidar com eles em foreachBatch:

Tipo de exceção

Exemplos

Ação recomendada

erros transitórios de coleta de lixo

SQLTransientConnectionException, HTTP 429, tempos limite

Catch : tente novamente ou envie para a fila de mensagens não entregues.

Violações de restrições key ou duplicadas quando o coletor é idempotente.

SQLIntegrityConstraintViolationException

Capturar : log e suprimir

Erros personalizados que podem ser repetidos

Exceções de socket encapsuladas, erros de banco de dados recuperáveis

Captura : incrementar métricas e permitir continuação controlada.

Erros de lógica ou de esquema

NullPointerException, AttributeError, incompatibilidade de esquema

Propagar : permitir que o Spark falhe na consulta.

Erros de destino não recuperáveis ou bugs lógicos não detectados

ValueError, PermissionError

Propagar : permitir que o Spark falhe na consulta.

falhas críticas

OutOfMemoryError, estado corrompido, violações de integridade de dados

Propagar : permitir que o Spark falhe na consulta.

Exemplos de código: tratamento de exceções

Os exemplos a seguir geram intencionalmente um erro em foreach para mostrar diferentes abordagens para lidar com o erro:

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

O código acima trata e suprime silenciosamente o erro, podendo não consumir o restante dos lotes. Existem duas opções para lidar com essa situação.

Primeiro, você pode relançar o erro, que o passa para a sua camada de orquestração para tentar novamente os lotes. Isso pode resolver o erro, caso seja um problema temporário, ou alertá-lo para que sua equipe de operações tente corrigi-lo manualmente. Para fazer isso, altere o código partial_func para que fique assim:

Python
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue

Em segundo lugar, se você quiser capturar a exceção e ignorar o resto dos lotes, você pode alterar o código para usar a função do_nothing para ignorar silenciosamente o resto dos lotes.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
pass

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()