DataSourceStreamReader
Uma classe base para leitores de transmissão de fonte de dados.
Os leitores de fonte de dados transmissão são responsáveis pela saída de dados de uma transmissão fonte de dados. Implemente esta classe e retorne uma instância de DataSource.streamReader() para tornar uma fonte de dados legível como uma fonte de transmissão.
Sintaxe
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
Métodos
Método | Descrição |
|---|---|
Retorna o deslocamento inicial da transmissão fonte de dados como um dict. Uma nova consulta de transmissão começa a ler este deslocamento. As consultas reiniciadas são retomadas a partir do ponto de parada salvo. | |
Retorna uma sequência de objetos | |
Gera dados para uma determinada partição e retorna um iterador de tuplas, linhas ou objetos PyArrow | |
Informa a fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a | |
Interrompe a origem e libera qualquer recurso alocado por ela. Invocado quando a consulta de transmissão termina. |
Notas
read()é estático e sem estado. Não acesse membros de classe mutáveis ou mantenha estado na memória entre diferentes invocações deread().- Todos os valores de partição retornados por
partitions()devem ser objetos serializáveis. - Os deslocamentos são representados como um dicionário ou dicionário recursivo, cujas chaves e valores são tipos primitivos: inteiros, strings ou booleanos.
Exemplos
Implemente um leitor de transmissão que leia uma sequência de registros indexados:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")