Use ForEachBatch para gravar em coletores de dados arbitrários
Este artigo discute o uso do site foreachBatch com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não tenha um coletor de transmissão existente.
O padrão de código streamingDF.writeStream.foreachBatch(...) permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um micro-lote.
- O ID exclusivo dos microlotes.
O senhor deve usar foreachBatch para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch.
Aplicar operações adicionais do DataFrame
Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch(), o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch() e as operações SQL MERGE INTO para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.
foreachBatch()fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar obatchIdfornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.foreachBatch()não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, useforeach()em vez disso.- Ao usar o site
foreachBatchcom um operador com estado, é importante consumir completamente cada lote antes da conclusão do processamento. Ver consumir completamente cada lote DataFrame
Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir operações adequadas. Um exemplo é mostrado aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0
Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:
print()comando write output to the driver logs.- Você não pode acessar o submódulo
dbutils.widgetsdentro da função. - Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.
Reutilização de lotes existentes fonte de dados
Usando o site foreachBatch(), o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:
Muitos outros lotes de fontes de dados podem ser usados em foreachBatch(). Consulte Conectar à fonte de dados e ao serviço externo.
Escreva em vários locais
Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.
O uso do site foreachBatch para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.
Se o senhor usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações em tabelas idempotentes em foreachBatch.
Consumir completamente cada lote DataFrame
Quando o senhor estiver usando operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração de lotes deverá consumir todo o DataFrame ou reiniciar a consulta. Se o senhor não consumir todo o DataFrame, a consulta de transmissão falhará com os próximos lotes.
Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.
Uso intencional de um subconjunto dos lotes
Se o senhor se preocupar apenas com um subconjunto dos lotes, poderá ter um código como o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Nesse caso, o site batch_df.show(2) só lida com os dois primeiros itens do lote, o que é esperado, mas se houver mais itens, eles deverão ser consumidos. O código a seguir consome o DataFrame completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Aqui, a função do_nothing ignora silenciosamente o restante do DataFrame.
Tratamento de um erro em um lote
Para tratamento de erros em foreachBatch, Databricks recomenda que você permita que a consulta de transmissão falhe rapidamente e, em vez disso, confie na camada de orquestração, como LakeFlow Jobs ou Apache Airflow, para gerenciar a lógica de repetição. Isso é muito mais seguro do que criar loops de repetição complexos em seu código, onde pode ocorrer perda de dados.
Aqui estão algumas diretrizes com base no seu objetivo de escrita:
Destino | Exemplos | Orientação |
|---|---|---|
Operações DataFrame | Tabelas Delta Lake | Você deve usar as opções de escrita |
Código personalizado e destinos externos |
| Implemente sua própria idempotência. Você deve assumir que quaisquer operações podem e serão repetidas em lotes diferentes. Se o |
Aqui estão alguns exemplos de tipos de exceção e recomendações sobre como lidar com eles em foreachBatch:
Tipo de exceção | Exemplos | Ação recomendada |
|---|---|---|
erros transitórios de coleta de lixo |
| Catch : tente novamente ou envie para a fila de mensagens não entregues. |
Violações de restrições key ou duplicadas quando o coletor é idempotente. |
| Capturar : log e suprimir |
Erros personalizados que podem ser repetidos | Exceções de socket encapsuladas, erros de banco de dados recuperáveis | Captura : incrementar métricas e permitir continuação controlada. |
Erros de lógica ou de esquema |
| Propagar : permitir que o Spark falhe na consulta. |
Erros de destino não recuperáveis ou bugs lógicos não detectados |
| Propagar : permitir que o Spark falhe na consulta. |
falhas críticas |
| Propagar : permitir que o Spark falhe na consulta. |
Exemplos de código: tratamento de exceções
Os exemplos a seguir geram intencionalmente um erro em foreach para mostrar diferentes abordagens para lidar com o erro:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
O código acima trata e suprime silenciosamente o erro, podendo não consumir o restante dos lotes. Existem duas opções para lidar com essa situação.
Primeiro, você pode relançar o erro, que o passa para a sua camada de orquestração para tentar novamente os lotes. Isso pode resolver o erro, caso seja um problema temporário, ou alertá-lo para que sua equipe de operações tente corrigi-lo manualmente. Para fazer isso, altere o código partial_func para que fique assim:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Em segundo lugar, se você quiser capturar a exceção e ignorar o resto dos lotes, você pode alterar o código para usar a função do_nothing para ignorar silenciosamente o resto dos lotes.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()