Use ForEachBatch para gravar em destinos de dados arbitrários no pipeline.
Visualização
A API foreach_batch_sink está em Pré-visualização Pública.
O coletor ForEachBatch permite processar uma transmissão como uma série de microlotes. Cada lote pode ser processado em Python com lógica personalizada semelhante à transmissão estruturada do Apache Spark foreachBatch. Com o coletor ForEachBatch do pipeline declarativo Spark (SDP) LakeFlow , você pode transformar, merge ou gravar dados de transmissão em um ou mais destinos que não oferecem suporte nativo a gravações de transmissão. Esta página orienta você na configuração de um coletor ForEachBatch, fornece exemplos e discute considerações key .
O coletor ForEachBatch oferece a seguinte funcionalidade:
- Lógica personalizada para cada microlote : ForEachBatch é um coletor de transmissão flexível. Você pode aplicar ações arbitrárias (como mesclar em uma tabela externa, gravar em vários destinos ou realizar upserts) com código Python.
- Suporte completo refresh : o pipeline gerencia pontos de verificação por fluxo, de forma que os pontos de verificação sejam redefinidos automaticamente quando você realiza uma refresh completa do seu pipeline. Com o coletor ForEachBatch, você é responsável por gerenciar a redefinição dos dados subsequentes quando isso ocorrer.
- SuporteUnity Catalog : o coletor ForEachBatch oferece suporte a todos os recursos Unity Catalog , como leitura e gravação em volumes ou tabelas Unity Catalog .
- Limpeza limitada : O pipeline não rastreia quais dados são gravados a partir de um coletor ForEachBatch, portanto, não consegue limpar esses dados. Você é responsável por toda a gestão de dados subsequentes.
- log de eventos : O log de eventos pipeline registra a criação e o uso de cada coletor ForEachBatch. Se a sua função Python não for serializável, você verá uma entrada de aviso no log de eventos com sugestões adicionais.
- O coletor ForEachBatch foi projetado para consultas de transmissão, como
append_flow. Não se destina a pipelines somente de lotes ou à semânticaAutoCDC. - O coletor ForEachBatch descrito nesta página é para pipeline. Apache Spark transmissão estruturada também suporta
foreachBatch. Para informações sobre a transmissão estruturadaforeachBatch, consulte Use foreachBatch para gravar em coletores de dados arbitrários.
Quando usar um coletor ForEachBatch
Use um coletor ForEachBatch sempre que seu pipeline exigir funcionalidade que não esteja disponível por meio de um formato de coletor integrado, como delta ou kafka. Os casos de uso típicos incluem:
- Fusão ou upsert em uma tabela Delta Lake : execução de lógica merge personalizada para cada microlote (por exemplo, tratamento de registros atualizados).
- Escrita em múltiplos destinos ou destinos não suportados : Escrever a saída de cada lote em múltiplas tabelas ou sistemas de armazenamento externos que não suportam gravações de transmissão (como certos coletores JDBC ).
- Aplicando lógica ou transformações personalizadas : Manipule dados diretamente em Python (por exemplo, usando biblioteca especializada ou transformações avançadas).
Para obter informações sobre os sinks integrados ou sobre como criar sinks personalizados com Python, consulte Sinks no pipeline declarativo LakeFlow Spark.
Sintaxe
Use a decoração @dp.foreach_batch_sink() para gerar um coletor ForEachBatch. Você pode então referenciar isso como um target em sua definição de fluxo, por exemplo em @dp.append_flow.
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
Parâmetro | Descrição |
|---|---|
name | Opcional. Um nome único para identificar o ponto de coleta dentro da pipeline. Quando não incluída, a UDF assume o nome padrão. |
manipulador_de_lotes | Esta é a função definida pelo usuário (UDF) que será chamada para cada microlote. |
df | DataFrame Spark contendo dados para os microlotes atuais. |
id_do_lote | O ID inteiro dos microlotes. O Spark incrementa esse ID para cada intervalo de disparo. Um |
refreshcompleta
Como o ForEachBatch usa uma consulta de transmissão, o pipeline rastreia o diretório de checkpoint para cada fluxo. Após refreshcompleta :
- O diretório de pontos de verificação é Reset.
- Sua função de coleta ( UDF
foreach_batch_sink) vê um novo ciclobatch_idcomeçando de 0. - Os dados no seu sistema de destino não são limpos automaticamente pelo pipeline (porque o pipeline não sabe onde seus dados estão gravados). Se você precisar de um cenário totalmente novo, deverá excluir ou truncar manualmente as tabelas ou locais externos que o seu coletor ForEachBatch preenche.
Usando o recurso Unity Catalog
Todas as funcionalidades existentes Unity Catalog no Spark transmissão estruturada foreach_batch_sink permanecem disponíveis.
Isso inclui a escrita em tabelas gerenciais ou tabelas externas Unity Catalog . Você pode escrever microlotes no Unity Catalog ou em tabelas externas exatamente como faria em qualquer Job estruturado Apache Spark .
log eventos
Ao criar um coletor ForEachBatch, um evento SinkDefinition , com "format": "foreachBatch" é adicionado ao log de eventos do pipeline.
Isso permite rastrear o uso dos sinks ForEachBatch e visualizar avisos sobre o seu sink.
Utilizando com o Databricks Connect
Se a função que você fornecer não for serializável (um requisito importante para o Databricks Connect), o log de eventos incluirá uma entrada WARN recomendando que você simplifique ou refatore seu código se o suporte Databricks Connect for necessário.
Por exemplo, se você usar dbutils para obter parâmetros dentro de uma UDF ForEachBatch, você
Em vez disso, você pode obter o argumento antes de usá-lo na UDF:
# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
value = dbutils.widgets.get ("X") + str (i)
# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")
def foreach_batch(df, batchId):
value = argX + str (i)
Melhores práticas
- Mantenha sua função ForEachBatch concisa : evite multithreading, dependências pesadas de bibliotecas ou grandes manipulações de dados na memória. Lógicas complexas ou com estado podem levar a erros de serialização ou gargalos de desempenho.
- Monitore sua pasta de checkpoints : Para consultas de transmissão, o SDP gerencia checkpoints por fluxo, não por destino. Se você tiver vários fluxos em seu pipeline, cada fluxo terá seu próprio diretório de pontos de verificação.
- Validar dependências externas : Se você depende de sistemas ou bibliotecas externas, verifique se eles estão instalados em todos os nós cluster ou no seu contêiner.
- Esteja atento ao Databricks Connect : Se o seu ambiente puder migrar para o Databricks Connect no futuro, verifique se o seu código é serializável e não depende de
dbutilsdentro do UDFforeach_batch_sink.
Limitações
- Sem limpeza para ForEachBatch : Como seu código Python personalizado pode gravar dados em qualquer lugar, o pipeline não pode limpar ou rastrear esses dados. Você deve gerenciar seus próprios dados ou políticas de retenção para os destinos para os quais você envia dados.
- métricas em micro-lotes : pipeline coleta métricas de distribuição, mas alguns cenários podem causar métricas incompletas ou incomuns ao usar ForEachBatch. Isso se deve à flexibilidade inerente do ForEachBatch, que dificulta o acompanhamento do fluxo de dados e das linhas pelo sistema.
- Suporte para gravação em múltiplos destinos sem múltiplas leituras : Alguns clientes podem usar o ForEachBatch para ler de uma origem uma única vez e, em seguida, gravar em vários destinos. Para conseguir isso, você deve incluir
df.persistoudf.cachedentro da sua função ForEachBatch. Ao utilizar essas opções, o Databricks tentará preparar os dados apenas uma única vez. Sem essas opções, sua consulta resultará em múltiplas leituras. Isso não está incluído nos exemplos de código a seguir. - Usando com Databricks Connect : Se a execução do seu pipeline for no Databricks Connect,
foreachBatchfunções definidas pelo usuário (UDF) devem ser serializáveis e não podem usardbutils. O pipeline emite avisos se detectar uma UDF não serializável, mas não interrompe a execução do pipeline. - Lógica não serializável : O código que referencia objetos locais, classes ou recursos não serializáveis pode apresentar erros em contextos Databricks Connect . Use módulos Python puros e certifique-se de que referências (por exemplo,
dbutils) não sejam usadas se o Databricks Connect for um requisito.
Exemplos
Exemplo de sintaxe básica
from pyspark import pipelines as dp
# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
return
# Create source data for example:
@dp.table()
def example_source_data():
return spark.range(5)
# Add sink to an append flow:
@dp.append_flow(
target="my_foreachbatch_sink",
)
def my_flow():
return spark.readStream.format("delta").table("example_source_data")
Utilizando dados de exemplo para um pipeline simples
Este exemplo utiliza a amostra de táxi de Nova Iorque. Pressupõe-se que o administrador do seu workspace tenha ativado o catálogo de conjuntos de dados públicos Databricks . Para o coletor, altere my_catalog.my_schema para um catálogo e esquema aos quais você tenha acesso.
from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp
# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
# For this example, we are adding a timestamp column.
enriched = df.withColumn("processed_timestamp", current_timestamp())
# Write to a Delta location
enriched.write \
.format("delta") \
.mode("append") \
.saveAsTable("my_catalog.my_schema.trips_sink_delta")
# Return is optional here, but generally not used for the sink
return
# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
target="my_foreach_sink",
)
def taxi_source():
df = spark.readStream.table("samples.nyctaxi.trips")
return df
Escrever para vários destinos
Este exemplo escreve para vários destinos. Isso demonstra o uso de txnVersion e txnAppId para tornar as gravações em tabelas do Delta Lake idempotentes. Para mais detalhes, consulte Escritas de tabela idempotente em foreachBatch.
Suponha que estamos escrevendo em duas tabelas, table_a e table_b, e suponha que dentro de um lote, a escrita em table_a seja bem-sucedida enquanto a escrita em table_b falhe. Quando os lotes são reexecucionados, o par (txnVersion, txnAppId) permitirá que Delta ignore a escrita duplicada em table_a e escreva apenas os lotes em table_b.
from pyspark import pipelines as dp
app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId
# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
# Optionally do transformations, logging, or merging logic
# ...
# Write to a Delta table
df.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.example_table_1")
# Also write to a JSON file location
df.write \
.format("json") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.save("/tmp/json_target")
return
# Create source data for example
@dp.table()
def example_source():
return spark.range(5)
# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return spark.readStream.format("delta").table("example_source")
Usando spark.sql()
Você pode usar spark.sql() em seu coletor ForEachBatch, como no exemplo a seguir.
from pyspark import pipelines as dp
from pyspark.sql import Row
@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
df.createOrReplaceTempView("df_view")
df.sparkSession.sql("MERGE INTO target_table AS tgt " +
"USING df_view AS src ON tgt.id = src.id " +
"WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
"WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
)
return
# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")
# Create source table
@dp.table()
def src_table():
return spark.range(5)
@dp.append_flow(
target="example_sink",
)
def example_flow():
return spark.readStream.format("delta").table("source_table")
Perguntas frequentes (FAQ)
Posso usar dbutils no meu coletor ForEachBatch?
Se você planeja executar seu pipeline em um ambiente que não sejaDatabricks Connect, dbutils pode funcionar. No entanto, se você usar o Databricks Connect, dbutils não estará acessível dentro da sua função foreachBatch . O pipeline pode gerar avisos se detectar o uso de dbutils para ajudar você a evitar interrupções.
Posso usar vários fluxos com um único coletor ForEachBatch?
Sim. Você pode definir vários fluxos (com @dp.append_flow) que têm como alvo o mesmo nome de destino, mas cada um mantém seus próprios pontos de verificação.
O pipeline lida com a retenção ou limpeza de dados para o meu destino?
Não. Como o coletor ForEachBatch pode gravar em qualquer local ou sistema arbitrário, o pipeline não pode gerenciar ou excluir dados automaticamente nesse destino. Você deve lidar com essas operações como parte do seu código personalizado ou de processos externos.
Como posso solucionar erros ou falhas de serialização na minha função ForEachBatch?
Consulte os logs do driver de cluster ou os logs de eventos do pipeline. Para problemas de serialização relacionados ao Spark Connect, verifique se sua função depende apenas de objetos Python serializáveis e não referencia objetos não permitidos (como identificadores de arquivos abertos ou dbutils).