Skip to main content

DataSourceReader

A base class for data source readers.

Data source readers are responsible for outputting data from a data source. Implement this class and return an instance from DataSource.reader() to make a data source readable.

Syntax

Python
from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...

Methods

Method

Description

pushFilters(filters)

Called with the list of filters that can be pushed down to the data source. Returns an iterable of filters that still need to be evaluated by Spark. By default, returns all filters, indicating no filters are pushed down. pushFilters() is allowed to modify self. The object must remain picklable after modification. Changes to self are visible to partitions() and read().

partitions()

Returns a sequence of InputPartition objects that split data reading into parallel tasks. By default, returns a single partition. Override for better performance when reading large datasets. All partition values returned by partitions() must be picklable objects.

read(partition)

Generates data for a given partition and returns an iterator of tuples, rows, or PyArrow RecordBatch objects. Each tuple or row is converted to a row in the final DataFrame. This method is abstract and must be implemented.

Examples

Implement a basic reader that returns rows from a list of partitions:

Python
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]

def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)

Return rows using PyArrow RecordBatch:

Python
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch

Implement filter pushdown to support EqualTo filters:

Python
from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []

def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f

def read(self, partition):
...