Skip to main content

DataSource

A base class for data sources.

This class represents a custom data source that allows for reading from and/or writing to it. The data source provides methods to create readers and writers for reading and writing data, respectively. At least one of the methods reader() or writer() must be implemented by any subclass to make the data source either readable or writable (or both).

After implementing this interface, you can load your data source using spark.read.format(...).load() and save data using df.write.format(...).save().

Syntax

Python
from pyspark.sql.datasource import DataSource

class MyDataSource(DataSource):
@classmethod
def name(cls):
return "my_data_source"

Parameters

Parameter

Type

Description

options

dict

A case-insensitive dictionary representing the options for this data source.

Methods

Method

Description

name()

Returns a string representing the format name of this data source. By default, returns the class name. Override to provide a customized short name.

schema()

Returns the schema of the data source as a StructType or DDL string. If not implemented and no schema is provided by the user, an exception is thrown.

reader(schema)

Returns a DataSourceReader instance for reading data. Required for readable data sources.

writer(schema, overwrite)

Returns a DataSourceWriter instance for writing data. Required for writable data sources.

streamWriter(schema, overwrite)

Returns a DataSourceStreamWriter instance for writing data into a streaming sink. Required for writable streaming data sources.

simpleStreamReader(schema)

Returns a SimpleDataSourceStreamReader instance for reading streaming data. Used only when streamReader() is not implemented.

streamReader(schema)

Returns a DataSourceStreamReader instance for reading streaming data. Takes priority over simpleStreamReader().

Examples

Define and register a custom readable data source:

Python
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSource(DataSource):
@classmethod
def name(cls):
return "my_data_source"

def schema(self):
return "a INT, b STRING"

def reader(self, schema):
return MyDataSourceReader(schema)

class MyDataSourceReader(DataSourceReader):
def read(self, partition):
yield (1, "hello")
yield (2, "world")

spark.dataSource.register(MyDataSource)
df = spark.read.format("my_data_source").load()
df.show()

Define a data source with a StructType schema:

Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

class MyDataSource(DataSource):
def schema(self):
return StructType().add("a", "int").add("b", "string")