Skip to main content

foreach (DataStreamWriter)

Sets the output of the streaming query to be processed using the provided writer. The processing logic can be specified as a function that takes a row as input, or as an object with process(row) and optional open(partition_id, epoch_id) and close(error) methods.

Syntax

foreach(f)

Parameters

Parameter

Type

Description

f

callable or object

A function that takes a Row as input, or an object with a process(row) method and optional open and close methods.

Returns

DataStreamWriter

Notes

The provided object must be serializable. Any initialization for writing data (for example, opening a connection) should be done inside open(), not at construction time.

Examples

Python
import time
df = spark.readStream.format("rate").load()

Process each row using a function:

Python
def print_row(row):
print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Process each row using an object with open, process, and close methods:

Python
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True

def process(self, row):
print(row)

def close(self, error):
print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()