メインコンテンツまでスキップ

データソースリーダー

データソースリーダーの基本クラス。

データソースリーダーは、データソースからデータを出力する責任があります。 このクラスを実装し、 DataSource.reader()からインスタンスを返してデータ ソースを読み取り可能にします。

構文

Python
from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...

方法

手法

説明

pushFilters(filters)

データソースにプッシュダウンできるフィルターのリストを指定して呼び出されます。 Spark によってまだ評価される必要があるフィルターの反復可能オブジェクトを返します。デフォルトでは、すべてのフィルターを返します。これは、フィルターがプッシュダウンされていないことを示します。pushFilters() selfを変更することができます。オブジェクトは変更後も pickle 化可能なままである必要があります。selfへの変更はpartitions()read()に表示されます。

partitions()

データの読み取りを並列タスクに分割するInputPartitionオブジェクトのシーケンスを返します。デフォルトでは、単一のパーティションを返します。大規模なデータセットを読み取るときにパフォーマンスを向上させるためにオーバーライドします。partitions()によって返されるすべてのパーティション値は、pickle 化可能なオブジェクトである必要があります。

read(partition)

指定されたパーティションのデータを生成し、タプル、行、または PyArrow RecordBatchオブジェクトの反復子を返します。各タプルまたは行は、最終的な DataFrame の行に変換されます。このメソッドは抽象的であり、実装する必要があります。

パーティションのリストから行を返す基本的なリーダーを実装します。

Python
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]

def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)

PyArrow RecordBatchを使用して行を返します:

Python
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch

EqualToフィルターをサポートするためにフィルター プッシュダウンを実装します:

Python
from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []

def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f

def read(self, partition):
...