Use ForEachBatch to write to arbitrary data sinks in pipelines
The foreach_batch_sink API is in Public Preview.
The ForEachBatch sink allows you to process a stream as a series of micro-batches. Each batch can be processed in Python with custom logic similar to Apache Spark Structured Streaming's foreachBatch. With the Lakeflow Spark Declarative Pipelines (SDP) ForEachBatch sink, you can transform, merge, or write streaming data to one or more targets that do not natively support streaming writes. This page walks you through setting up a ForEachBatch sink, provides examples, and discusses key considerations.
ForEachBatch sink provides the following functionality:
- Custom logic for each micro-batch: ForEachBatch is a flexible streaming sink. You can apply arbitrary actions (such as merging into an external table, writing to multiple destinations, or performing upserts) with Python code.
- Full refresh support: Pipelines manage checkpoints on a per-flow basis, so checkpoints reset automatically when you perform a full refresh of your pipeline. With ForEachBatch sink, you are responsible for managing the downstream data reset when this happens.
- Unity Catalog support: ForEachBatch sink supports all Unity Catalog features, such as reading from or writing to Unity Catalog volumes or tables.
- Limited housekeeping: The pipeline does not track what data is written from a ForEachBatch sink, so cannot clean up that data. You are responsible for any downstream data management.
- Event log entries: The pipeline event log records the creation and usage of each ForEachBatch sink. If your Python function is not serializable, you see a warning entry in the event log with additional suggestions.
- The ForEachBatch sink is designed for streaming queries, such as
append_flow. It is not intended for batch-only pipelines or forAutoCDCsemantics. - The ForEachBatch sink described on this page is for pipelines. Apache Spark Structured Streaming also supports
foreachBatch. For information about the Structured StreamingforeachBatch, see Use foreachBatch to write to arbitrary data sinks.
When to use a ForEachBatch sink
Use a ForEachBatch sink whenever your pipeline requires functionality that is not available through a built-in sink format such as delta, or kafka. Typical use cases include:
- Merging or upserting into a Delta Lake table: Run custom merge logic for each micro-batch (for example, handling updated records).
- Writing to multiple or unsupported destinations: Write the output of each batch to multiple tables or external storage systems that do not support streaming writes (like certain JDBC sinks).
- Applying custom logic or transformations: Manipulate data in Python directly (for example, using specialized libraries or advanced transformations).
For information about the built-in sinks, or creating custom sinks with Python, see Sinks in Lakeflow Spark Declarative Pipelines.
Syntax
Use the @dp.foreach_batch_sink() decoration to generate a ForEachBatch sink. You can then references this as a target in your flow definition, for example in @dp.append_flow.
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
Parameter | Description |
|---|---|
name | Optional. A unique name to identify the sink within the pipeline. Defaults to the name of the UDF, when not included. |
batch_handler | This is the user-defined function (UDF) that will be called for each micro-batch. |
df | Spark DataFrame containing data for the current micro-batch. |
batch_id | The integer ID of the micro-batch. Spark increments this ID for each trigger interval. A |
Full refresh
Because ForEachBatch uses a streaming query, the pipeline tracks the checkpoint directory for each flow. On full refresh:
- The checkpoint directory is reset.
- Your sink function (
foreach_batch_sinkUDF) sees a brand-newbatch_idcycle starting from 0. - Data in your target system is not automatically cleaned up by the pipeline (because the pipeline does not know where your data is written). If you require a clean-slate scenario, you must manually drop or truncate the external tables or locations that your ForEachBatch sink populates.
Using Unity Catalog features
All existing Unity Catalog capabilities in Spark Structured Streaming foreach_batch_sink remain available.
This includes writing to managed or external Unity Catalog tables. You can write micro-batches into Unity Catalog managed or external tables exactly as you would in any Apache Spark Structured Streaming job.
Event log entries
When you create a ForEachBatch sink, a SinkDefinition event, with "format": "foreachBatch" is added to the pipeline's event log.
This allows you to track usage of ForEachBatch sinks, and see warnings about your sink.
Using with Databricks Connect
If the function you supply is not serializable (an important requirement for Databricks Connect), the event log includes a WARN entry recommending that you simplify or refactor your code if Databricks Connect support is required.
For example, if you use dbutils to get parameters within a ForEachBatch UDF, you
can instead get the argument before using it in the UDF:
# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
value = dbutils.widgets.get ("X") + str (i)
# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")
def foreach_batch(df, batchId):
value = argX + str (i)
Best practices
- Keep your ForEachBatch function concise: Avoid threading, heavy library dependencies, or large in-memory data manipulations. Complex or stateful logic can lead to serialization errors or performance bottlenecks.
- Monitor your checkpoint folder: For streaming queries, SDP manages checkpoints by flow, not by sink. If you have multiple flows in your pipeline, each flow has its own checkpoint directory.
- Validate external dependencies: If you rely on external systems or libraries, check that they are installed on all cluster nodes or in your container.
- Be mindful of Databricks Connect: If your environment may move to Databricks Connect in the future, check that your code is serializable and does not rely on
dbutilswithin theforeach_batch_sinkUDF.
Limitations
- No housekeeping for ForEachBatch: Because your custom Python code may write data anywhere, the pipeline cannot clean up or track that data. You must handle your own data management or retention policies for the destinations you write to.
- Metrics in micro-batch: Pipelines collect streaming metrics, but some scenarios can cause incomplete or unusual metrics when using ForEachBatch. This is due to the underlying flexibility of ForEachBatch which makes tracking data flow and rows difficult for the system.
- Supporting writing to multiple destinations without multiple reads: Some customers may use ForEachBatch to read from a source once and then write to multiple destinations. To accomplish this, you must include
df.persistordf.cacheinside your ForEachBatch function. Using these options, Databricks will attempt to ready the data only a single time. Without these options your query will result in multiple reads. This is not included in the following code examples. - Using with Databricks Connect: If your pipeline runs on Databricks Connect,
foreachBatchuser-defined functions (UDF) must be serializable and cannot usedbutils. The pipeline raises warnings if it detects a non-serializable UDF, but does not fail the pipeline. - Non-serializable logic: Code referencing local objects, classes, or unpickleable resources can break in Databricks Connect contexts. Use pure Python modules and confirm that references (for example,
dbutils) are not used if Databricks Connect is a requirement.
Examples
Basic syntax example
from pyspark import pipelines as dp
# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
return
# Create source data for example:
@dp.table()
def example_source_data():
return spark.range(5)
# Add sink to an append flow:
@dp.append_flow(
target="my_foreachbatch_sink",
)
def my_flow():
return spark.readStream.format("delta").table("example_source_data")
Using sample data for a simple pipeline
This example uses the NYC Taxi sample. It assumes that your workspace admin has enabled the Databricks Public Datasets catalog. For the sink, change my_catalog.my_schema to a catalog and schema that you have access to.
from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp
# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
# For this example, we are adding a timestamp column.
enriched = df.withColumn("processed_timestamp", current_timestamp())
# Write to a Delta location
enriched.write \
.format("delta") \
.mode("append") \
.saveAsTable("my_catalog.my_schema.trips_sink_delta")
# Return is optional here, but generally not used for the sink
return
# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
target="my_foreach_sink",
)
def taxi_source():
df = spark.readStream.table("samples.nyctaxi.trips")
return df
Writing to multiple destinations
This example writes to multiple destinations. It demonstrates the use of txnVersion and txnAppId to make writes to Delta Lake tables idempotent. For details, see Idempotent table writes in foreachBatch.
Suppose we are writing to two tables, table_a and table_b, and suppose that within a batch, the write to table_a succeeds while the write to table_b fails. When the batch is re-run, the (txnVersion, txnAppId) pair will allow Delta to ignore the duplicate write to table_a, and only write the batch to table_b.
from pyspark import pipelines as dp
app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId
# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
# Optionally do transformations, logging, or merging logic
# ...
# Write to a Delta table
df.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.example_table_1")
# Also write to a JSON file location
df.write \
.format("json") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.save("/tmp/json_target")
return
# Create source data for example
@dp.table()
def example_source():
return spark.range(5)
# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return spark.readStream.format("delta").table("example_source")
Using spark.sql()
You can use spark.sql() in your ForEachBatch sink, as in the following example.
from pyspark import pipelines as dp
from pyspark.sql import Row
@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
df.createOrReplaceTempView("df_view")
df.sparkSession.sql("MERGE INTO target_table AS tgt " +
"USING df_view AS src ON tgt.id = src.id " +
"WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
"WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
)
return
# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")
# Create source table
@dp.table()
def src_table():
return spark.range(5)
@dp.append_flow(
target="example_sink",
)
def example_flow():
return spark.readStream.format("delta").table("source_table")
Frequently asked questions (FAQ)
Can I use dbutils in my ForEachBatch sink?
If you plan to run your pipeline in a non-Databricks Connect environment, dbutils may work. However, if you use Databricks Connect, dbutils is not accessible within your foreachBatch function. The pipeline may raise warnings if it detects dbutils usage to help you avoid breakages.
Can I use multiple flows with a single ForEachBatch sink?
Yes. You can define multiple flows (with @dp.append_flow) that all target the same sink name, but they each maintain their own checkpoints.
Does the pipeline handle data retention or cleanup for my target?
No. Because the ForEachBatch sink can write to any arbitrary location or system, the pipeline cannot automatically manage or delete data in that target. You must handle these operations as part of your custom code or external processes.
How do I troubleshoot serialization errors or failures in my ForEachBatch function?
Look at your cluster driver logs or pipeline event logs. For Spark Connect–related serialization issues, check that your function depends only on serializable Python objects and does not reference disallowed objects (like open file handles or dbutils).