データソースリーダー
データソースリーダーの基本クラス。
データソースリーダーは、データソースからデータを出力する責任があります。 このクラスを実装し、 DataSource.reader()からインスタンスを返してデータ ソースを読み取り可能にします。
構文
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
方法
手法 | 説明 |
|---|---|
| データソースにプッシュダウンできるフィルターのリストを指定して呼び出されます。 Spark によってまだ評価される必要があるフィルターの反復可能オブジェクトを返します。デフォルトでは、すべてのフィルターを返します。これは、フィルターがプッシュダウンされていないことを示します。 |
データの読み取りを並列タスクに分割する | |
指定されたパーティションのデータを生成し、タプル、行、または PyArrow |
例
パーティションのリストから行を返す基本的なリーダーを実装します。
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
PyArrow RecordBatchを使用して行を返します:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
EqualToフィルターをサポートするためにフィルター プッシュダウンを実装します:
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...