Skip to main content

Use foreachBatch to write to arbitrary data sinks

This page shows how to use foreachBatch with Structured Streaming to write the output of a streaming query to data sources that do not have an existing streaming sink.

The code pattern streamingDF.writeStream.foreachBatch(...) allows you to apply batch functions to the output data of every micro-batch of the streaming query. Functions used with foreachBatch take two parameters:

  • A DataFrame that has the output data of a micro-batch.
  • The unique ID of the micro-batch.

You must use foreachBatch for Delta Lake merge operations in Structured Streaming. See Upsert from streaming queries using foreachBatch.

Apply additional DataFrame operations

Many DataFrame and Dataset operations are not supported in streaming DataFrames because Spark does not support generating incremental plans in those cases. Using foreachBatch() you can apply some of these operations on each micro-batch output. For example, you can use foreachBatch() and the SQL MERGE INTO operation to write the output of streaming aggregations into a Delta table in update mode. See more details in MERGE INTO.

important
  • foreachBatch() provides only at-least-once write guarantees. However, you can use the batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. In either case, you will have to reason about the end-to-end semantics yourself.
  • foreachBatch() does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query. If you write data in continuous mode, use foreach() instead.
  • When using foreachBatch with a stateful operator, it is important to completely consume each batch before processing is complete. See Completely consume each batch DataFrame

Handle empty DataFrames

foreachBatch() might receive an empty DataFrame, and your code must handle this scenario. Otherwise, your query might fail.

For example, when Delta Lake is the streaming source, these scenarios might pass an empty DataFrame to foreachBatch():

  • OPTIMIZE with no files to process: When an OPTIMIZE operation runs on the Delta Lake source table but there are no files to process, Structured Streaming writes an offset log entry to increment the table version. This produces an empty micro-batch on the sink even though no files are read.
  • File pruning at the physical plan level: If predicate pushdown or file pruning eliminates all records at the physical plan level, the result is an empty commit to the sink.

User code must handle empty DataFrames to allow for proper operation. See the examples below:

Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass

streamingDF.writeStream.foreachBatch(process_batch).start()

Behavior changes for foreachBatch in Databricks Runtime 14.0

In Databricks Runtime 14.0 and above on compute configured with standard access mode, the following behavior changes apply:

  • print() commands write output to the driver logs.
  • You cannot access the dbutils.widgets submodule inside the function.
  • Any files, modules, or objects referenced in the function must be serializable and available on Spark.

Reuse existing batch data sources

Using foreachBatch(), you can use existing batch data writers for data sinks that might not have Structured Streaming support. Here are a few examples:

Many other batch data sources can be used from foreachBatch(). See Connect to data sources and external services.

Write to multiple locations

If you need to write the output of a streaming query to multiple locations, Databricks recommends using multiple Structured Streaming writers for best parallelization and throughput.

Using foreachBatch to write to multiple sinks serializes the execution of streaming writes, which can increase latency for each micro-batch.

If you do use foreachBatch to write to multiple Delta tables, see Idempotent table writes in foreachBatch.

Completely consume each batch DataFrame

When you are using stateful operators (for example, using dropDuplicatesWithinWatermark), each batch iteration must consume the entire DataFrame or restart the query. If you do not consume the entire DataFrame, the streaming query will fail with the next batch.

This can happen in several cases. The following examples show how to fix queries that do not correctly consume a DataFrame.

Intentionally using a subset of the batch

If you only care about a subset of the batch, you could have code such as the following.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
batch_df.show(2)

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

In this case, the batch_df.show(2) only handles the first two items in the batch, which is expected, but if there are more items, they must be consumed. The following code consumes the full DataFrame.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
pass

def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

Here, the do_nothing function silently ignores the rest of the DataFrame.

Handling an error in a batch

For error handling in foreachBatch, Databricks recommends that you allow the streaming query to fail fast and instead rely on the orchestration layer, such as Lakeflow Jobs or Apache Airflow, to manage the retry logic. This is much safer than building complex retry loops in your code, where data loss may occur.

Here are guidelines based on your write target:

Target

Examples

Guidance

DataFrame operations

Delta Lake tables

You must use the txnAppId and txnVersion write options, binding txnVersion to batchId, to guarantee idempotency and protect data correctness on retries. Do not catch and retry exceptions locally. Instead, Databricks recommends that you allow errors to propagate so that Spark metrics stay accurate, data does not duplicate, and the orchestrator can cleanly retry the full batch.

Custom code and external destinations

.collect(), OLTP databases, message queues, APIs

Implement your own idempotency. You must assume that any operation can and will be retried across batches. If the batchId stays the same, the result of your operation must remain the same. You may retry purely transient errors such as brief connection timeouts, but take extreme care to avoid partial or duplicate writes if the retry ultimately fails. The safest approach is to let errors propagate and allow the orchestrator to retry the entire batch.

Here are some examples of exception types and recommendations for how to handle them in foreachBatch:

Exception type

Examples

Recommended action

Transient sink errors

SQLTransientConnectionException, HTTP 429, timeouts

Catch: retry, or send to a dead-letter queue

Duplicate or key constraint violations when the sink is idempotent

SQLIntegrityConstraintViolationException

Catch: log and suppress

Custom retryable errors

Wrapped socket exceptions, retriable database errors

Catch: increment metrics and allow controlled continuation

Logic or schema errors

NullPointerException, AttributeError, schema mismatch

Propagate: let Spark fail the query

Non-retryable sink errors or uncaught logic bugs

ValueError, PermissionError

Propagate: let Spark fail the query

Critical failures

OutOfMemoryError, corrupted state, data integrity violations

Propagate: let Spark fail the query

Code examples: exception handling

The following examples intentionally raise an error in foreach to show different approaches to handle the error:

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

The above code handles and silently suppresses the error, and may not consume the rest of the batch. There are two options for handling this situation.

First, you can re-raise the error, which passes it up to your orchestration layer to retry the batch. This can solve the error, if it is a transient problem, or raise it for your operations team to try to manually fix. To do this, change the partial_func code to look like this:

Python
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue

Second, if you want to catch the exception and ignore the rest of the batch, you can change the code to use the do_nothing function to silently ignore the rest of the batch.

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
pass

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()