foreach_batch_sink
The @dp.foreach_batch_sink() decorator defines a ForEachBatch sink, which processes a stream as a series of micro-batches that you handle in Python with custom logic. You reference the sink as a target in an append flow to write the transformed data. For conceptual guidance, considerations, and examples, see Use ForEachBatch to write to arbitrary data sinks in pipelines.
Syntax
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
Parameters
Parameter | Description |
|---|---|
name | Optional. A unique name to identify the sink within the pipeline. Defaults to the name of the UDF, when not included. |
batch_handler | This is the user-defined function (UDF) that will be called for each micro-batch. |
df | Spark DataFrame containing data for the current micro-batch. |
batch_id | The integer ID of the micro-batch. Spark increments this ID for each trigger interval. A |