Skip to main content

DataSourceStreamReader

A base class for streaming data source readers.

Data source stream readers are responsible for outputting data from a streaming data source. Implement this class and return an instance from DataSource.streamReader() to make a data source readable as a streaming source.

Syntax

Python
from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...

def partitions(self, start, end):
...

def read(self, partition):
...

Methods

Method

Description

initialOffset()

Returns the initial offset of the streaming data source as a dict. A new streaming query starts reading from this offset. Restarted queries resume from the checkpointed offset instead.

partitions(start, end)

Returns a sequence of InputPartition objects representing the data between start and end offsets. Returns an empty sequence if start equals end.

read(partition)

Generates data for a given partition and returns an iterator of tuples, rows, or PyArrow RecordBatch objects. Each tuple or row is converted to a row in the final DataFrame. This method is abstract and must be implemented.

commit(end)

Informs the source that Spark has completed processing all data for offsets less than or equal to end. Spark will only request offsets greater than end in the future.

stop()

Stops the source and frees any resources it has allocated. Invoked when the streaming query terminates.

Notes

  • read() is static and stateless. Do not access mutable class members or keep in-memory state between different invocations of read().
  • All partition values returned by partitions() must be picklable objects.
  • Offsets are represented as a dict or recursive dict whose keys and values are primitive types: integer, string, or boolean.

Examples

Implement a streaming reader that reads from a sequence of indexed records:

Python
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}

def latestOffset(self, start, limit):
return {"index": start["index"] + 10}

def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]

def read(self, partition):
yield (partition.value, f"record-{partition.value}")

def commit(self, end):
print(f"Committed up to offset {end}")

def stop(self):
print("Stopping stream reader")