Pular para o conteúdo principal

Leitor de fonte de dados

Uma classe base para leitores de fontes de dados.

leitores de fonte de dados são responsáveis pela saída de dados de uma fonte de dados. Implemente esta classe e retorne uma instância de DataSource.reader() para tornar uma fonte de dados legível.

Sintaxe

Python
from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...

Métodos

Método

Descrição

pushFilters(filters)

Chamada com a lista de filtros que podem ser enviados para a fonte de dados. Retorna um iterável de filtros que ainda precisam ser avaliados pelo Spark. Por default, retorna todos os filtros, indicando que nenhum filtro foi enviado para as camadas inferiores. pushFilters() pode modificar self. O objeto deve permanecer descritível após a modificação. As alterações em self são visíveis em partitions() e read().

partitions()

Retorna uma sequência de objetos InputPartition que dividem a leitura de dados em tarefas paralelas. Por default, retorna uma única partição. Substitua essa configuração para obter melhor desempenho na leitura de grandes conjuntos de dados. Todos os valores de partição retornados por partitions() devem ser objetos serializáveis.

read(partition)

Gera dados para uma determinada partição e retorna um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida em uma linha no DataFrame final. Este método é abstrato e precisa ser implementado.

Exemplos

Implemente um leitor básico que retorne linhas de uma lista de partições:

Python
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]

def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)

Retorna linhas usando PyArrow RecordBatch:

Python
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch

Implementar o pushdown de filtros para suportar filtros EqualTo :

Python
from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []

def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f

def read(self, partition):
...