SimpleDataSourceStreamReader
A base class for simplified streaming data source readers.
Compared to DataSourceStreamReader, SimpleDataSourceStreamReader doesn't require planning data partitions. The read() method allows reading data and planning the latest offset at the same time.
Because SimpleDataSourceStreamReader reads records in the Spark driver to determine the end offset of each batch without partitioning, it is only suited for lightweight use cases where input rate and batch size are small. Use DataSourceStreamReader when read throughput is high and can't be handled by a single process.
Added in Databricks Runtime 15.3
Syntax
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
Method | Description |
|---|---|
Returns the initial offset of the streaming data source. A new streaming query starts reading from this offset. | |
Reads all available data from the start offset and returns a tuple of an iterator of records and the end offset for the next read attempt. | |
Reads all available data between specific start and end offsets. Invoked during failure recovery to re-read a batch deterministically. | |
Informs the source that Spark has completed processing all data for offsets less than or equal to |
Examples
Define a custom simplified streaming data source reader:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()