Use ForEachBatch para gravar em coletores de dados arbitrários
Este artigo discute o uso do site foreachBatch
com transmissão estruturada para gravar a saída de uma consulta de transmissão em uma fonte de dados que não tenha um coletor de transmissão existente.
O padrão de código streamingDF.writeStream.foreachBatch(...)
permite que o senhor aplique muitas funções aos dados de saída de cada microlote da consulta de transmissão. As funções usadas com foreachBatch
usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um micro-lote.
- O ID exclusivo dos microlotes.
O senhor deve usar foreachBatch
para Delta Lake merge operações in transmissão estructurada. Consulte Upsert de consultas de transmissão usando foreachBatch
.
Aplicar operações adicionais do DataFrame
Muitas operações do DataFrame e do conjunto de dados não são compatíveis com a transmissão DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando o site foreachBatch()
, o senhor pode aplicar algumas dessas operações em cada saída de micro-lotes. Por exemplo, o senhor pode usar foreachBatch()
e as operações SQL MERGE INTO
para gravar a saída das agregações de transmissão em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.
foreachBatch()
fornece garantias de gravação de pelo menos uma vez. No entanto, você pode usar obatchId
fornecido à função como forma de desduplicar a saída e obter uma garantia de exatamente uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.foreachBatch()
não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução de micro-lotes de uma consulta de transmissão. Se você gravar dados no modo contínuo, useforeach()
em vez disso.- Ao usar o site
foreachBatch
com um operador com estado, é importante consumir completamente cada lote antes da conclusão do processamento. Ver consumir completamente cada lote DataFrame
Um dataframe vazio pode ser invocado com foreachBatch()
e o código do usuário precisa ser resiliente para permitir operações adequadas. Um exemplo é mostrado aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch
no Databricks Runtime 14.0
Em Databricks Runtime 14.0 e acima em compute configurado com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:
print()
comando write output to the driver logs.- Você não pode acessar o submódulo
dbutils.widgets
dentro da função. - Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.
Reutilização de lotes existentes fonte de dados
Usando o site foreachBatch()
, o senhor pode usar os gravadores de dados de lotes existentes para os coletores de dados que talvez não tenham suporte para transmissão estruturada. Aqui estão alguns exemplos:
Muitos outros lotes de fontes de dados podem ser usados em foreachBatch()
. Consulte Conectar à fonte de dados e ao serviço externo.
Escreva em vários locais
Se o senhor precisar gravar a saída de uma consulta de transmissão em vários locais, o site Databricks recomenda o uso de vários gravadores de transmissão estruturada para melhor paralelização e taxa de transferência.
O uso do site foreachBatch
para gravar em vários sinks serializa a execução de gravações de transmissão, o que pode aumentar a latência de cada micro-lote.
Se o senhor usar foreachBatch
para gravar em várias tabelas Delta, consulte Gravações em tabelas idempotentes em foreachBatch
.
Consumir completamente cada lote DataFrame
Quando o senhor estiver usando operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark
), cada iteração de lotes deverá consumir todo o DataFrame ou reiniciar a consulta. Se o senhor não consumir todo o DataFrame, a consulta de transmissão falhará com os próximos lotes.
Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.
Uso intencional de um subconjunto dos lotes
Se o senhor se preocupar apenas com um subconjunto dos lotes, poderá ter um código como o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Nesse caso, o site batch_df.show(2)
só lida com os dois primeiros itens do lote, o que é esperado, mas se houver mais itens, eles deverão ser consumidos. O código a seguir consome o DataFrame completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Aqui, a função do_nothing
ignora silenciosamente o restante do DataFrame.
Tratamento de um erro em um lote
Pode haver um erro ao executar um processo foreachBatch
. Você pode ter um código como o seguinte (nesse caso, a amostra gera intencionalmente um erro para mostrar o problema).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Ao manipular (e engolir silenciosamente) o erro, o restante dos lotes pode não ser consumido. Há duas opções para lidar com essa situação.
Primeiro, o senhor pode aumentar novamente o erro, o que passa para a sua camada de orquestração tentar novamente o lote. Isso pode solucionar o erro, se for um problema transitório, ou enviá-lo para a equipe de operações tentar corrigir manualmente. Para fazer isso, altere o código partial_func
para ficar assim:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
A segunda opção, se o senhor quiser capturar a exceção e ignorar o restante dos lotes, é alterar o código para o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Esse código usa a função do_nothing
para ignorar silenciosamente o restante dos lotes.