Use ForEachBatch para gravar em coletores de dados arbitrários
Esta página mostra como usar foreachBatch com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não possui um coletor de transmissão existente.
O padrão de código streamingDF.writeStream.foreachBatch(...) permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um micro-lote.
- O ID exclusivo dos microlotes.
O senhor deve usar foreachBatch para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch.
Aplicar operações adicionais do DataFrame
Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch(), o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch() e as operações SQL MERGE INTO para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.
foreachBatch()fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar obatchIdfornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.foreachBatch()não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, useforeach()em vez disso.- Ao usar o site
foreachBatchcom um operador com estado, é importante consumir completamente cada lote antes da conclusão do processamento. Ver consumir completamente cada lote DataFrame
Lidar com DataFrames vazios
foreachBatch() Você pode receber um DataFrame vazio, e seu código deve lidar com esse cenário. Caso contrário, sua consulta poderá falhar.
Por exemplo, quando Delta Lake é a fonte de transmissão, esses cenários podem passar um DataFrame vazio para foreachBatch():
OPTIMIZEsem arquivos para processar: Quando umaOPTIMIZEoperações de execução na tabela de origem Delta Lake , mas não há arquivos para processar, a transmissão estruturada grava uma entrada log de deslocamento para incrementar a versão da tabela. Isso gera um microlote vazio na pia, mesmo que nenhum arquivo seja lido.- Eliminação de arquivos no nível do plano físico: Se o pushdown de predicados ou a eliminação de arquivos eliminar todos os registros no nível do plano físico, o resultado será um commit vazio no coletor.
O código do usuário deve lidar com DataFrames vazios para permitir o funcionamento adequado. Veja os exemplos abaixo:
- Python
- Scala
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0
Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:
print()comando write output to the driver logs.- Você não pode acessar o submódulo
dbutils.widgetsdentro da função. - Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.
Reutilização de lotes existentes fonte de dados
Usando o site foreachBatch(), o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:
Muitos outros lotes de fontes de dados podem ser usados em foreachBatch(). Consulte Conectar à fonte de dados e ao serviço externo.
Escreva em vários locais
Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.
O uso do site foreachBatch para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.
Se você usar foreachBatch para gravar em várias tabelas Delta, consulte Usar foreachBatch para gravações de tabela idempotentes.
Consumir completamente cada lote DataFrame
Quando o senhor estiver usando operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração de lotes deverá consumir todo o DataFrame ou reiniciar a consulta. Se o senhor não consumir todo o DataFrame, a consulta de transmissão falhará com os próximos lotes.
Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.
Uso intencional de um subconjunto dos lotes
Se o senhor se preocupar apenas com um subconjunto dos lotes, poderá ter um código como o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Nesse caso, o site batch_df.show(2) só lida com os dois primeiros itens do lote, o que é esperado, mas se houver mais itens, eles deverão ser consumidos. O código a seguir consome o DataFrame completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Aqui, a função do_nothing ignora silenciosamente o restante do DataFrame.
Tratamento de um erro em um lote
Para tratamento de erros em foreachBatch, Databricks recomenda que você permita que a consulta de transmissão falhe rapidamente e, em vez disso, confie na camada de orquestração, como LakeFlow Jobs ou Apache Airflow, para gerenciar a lógica de repetição. Isso é muito mais seguro do que criar loops de repetição complexos em seu código, onde pode ocorrer perda de dados.
Aqui estão algumas diretrizes com base no seu objetivo de escrita:
Destino | Exemplos | Orientação |
|---|---|---|
Operações DataFrame | Tabelas Delta Lake | Você deve usar as opções de escrita |
Código personalizado e destinos externos |
| Implemente sua própria idempotência. Você deve assumir que quaisquer operações podem e serão repetidas em lotes diferentes. Se o |
Aqui estão alguns exemplos de tipos de exceção e recomendações sobre como lidar com eles em foreachBatch:
Tipo de exceção | Exemplos | Ação recomendada |
|---|---|---|
erros transitórios de coleta de lixo |
| Catch : tente novamente ou envie para a fila de mensagens não entregues. |
Violações de restrições key ou duplicadas quando o coletor é idempotente. |
| Capturar : log e suprimir |
Erros personalizados que podem ser repetidos | Exceções de socket encapsuladas, erros de banco de dados recuperáveis | Captura : incrementar métricas e permitir continuação controlada. |
Erros de lógica ou de esquema |
| Propagar : permitir que o Spark falhe na consulta. |
Erros de destino não recuperáveis ou bugs lógicos não detectados |
| Propagar : permitir que o Spark falhe na consulta. |
falhas críticas |
| Propagar : permitir que o Spark falhe na consulta. |
Exemplos de código: tratamento de exceções
Os exemplos a seguir geram intencionalmente um erro em foreach para mostrar diferentes abordagens para lidar com o erro:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
O código acima trata e suprime silenciosamente o erro, podendo não consumir o restante dos lotes. Existem duas opções para lidar com essa situação.
Primeiro, você pode relançar o erro, que o passa para a sua camada de orquestração para tentar novamente os lotes. Isso pode resolver o erro, caso seja um problema temporário, ou alertá-lo para que sua equipe de operações tente corrigi-lo manualmente. Para fazer isso, altere o código partial_func para que fique assim:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Em segundo lugar, se você quiser capturar a exceção e ignorar o resto dos lotes, você pode alterar o código para usar a função do_nothing para ignorar silenciosamente o resto dos lotes.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()