Pular para o conteúdo principal

DataSourceStreamReader

Uma classe base para leitores de transmissão de fonte de dados.

Os leitores de fonte de dados transmissão são responsáveis pela saída de dados de uma transmissão fonte de dados. Implemente esta classe e retorne uma instância de DataSource.streamReader() para tornar uma fonte de dados legível como uma fonte de transmissão.

Sintaxe

Python
from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...

def partitions(self, start, end):
...

def read(self, partition):
...

Métodos

Método

Descrição

initialOffset()

Retorna o deslocamento inicial da transmissão fonte de dados como um dict. Uma nova consulta de transmissão começa a ler este deslocamento. As consultas reiniciadas são retomadas a partir do ponto de parada salvo.

partitions(start, end)

Retorna uma sequência de objetos InputPartition representando os dados entre os deslocamentos start e end . Retorna uma sequência vazia se start for igual a end.

read(partition)

Gera dados para uma determinada partição e retorna um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida em uma linha no DataFrame final. Este método é abstrato e precisa ser implementado.

commit(end)

Informa a fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a end. O Spark solicitará apenas deslocamentos maiores que end no futuro.

stop()

Interrompe a origem e libera qualquer recurso alocado por ela. Invocado quando a consulta de transmissão termina.

Notas

  • read() é estático e sem estado. Não acesse membros de classe mutáveis ou mantenha estado na memória entre diferentes invocações de read().
  • Todos os valores de partição retornados por partitions() devem ser objetos serializáveis.
  • Os deslocamentos são representados como um dicionário ou dicionário recursivo, cujas chaves e valores são tipos primitivos: inteiros, strings ou booleanos.

Exemplos

Implemente um leitor de transmissão que leia uma sequência de registros indexados:

Python
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}

def latestOffset(self, start, limit):
return {"index": start["index"] + 10}

def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]

def read(self, partition):
yield (partition.value, f"record-{partition.value}")

def commit(self, end):
print(f"Committed up to offset {end}")

def stop(self):
print("Stopping stream reader")