Skip to main content

foreach_batch_sink

The @dp.foreach_batch_sink() decorator defines a ForEachBatch sink, which processes a stream as a series of micro-batches that you handle in Python with custom logic. You reference the sink as a target in an append flow to write the transformed data. For conceptual guidance, considerations, and examples, see Use ForEachBatch to write to arbitrary data sinks in pipelines.

Syntax

Python
from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.

Parameters

Parameter

Description

name

Optional. A unique name to identify the sink within the pipeline. Defaults to the name of the UDF, when not included.

batch_handler

This is the user-defined function (UDF) that will be called for each micro-batch.

df

Spark DataFrame containing data for the current micro-batch.

batch_id

The integer ID of the micro-batch. Spark increments this ID for each trigger interval.

A batch_id of 0 represents the start of a stream, or the beginning of a full refresh. The foreach_batch_sink code should properly handle a full refresh for downstream data sources. For more information, see Full refresh.

On this page