Pular para o conteúdo principal

Leitor de fluxo de fonte de dados simples

Uma classe base para leitores simplificados de transmissão de fontes de dados.

Em comparação com DataSourceStreamReader, SimpleDataSourceStreamReader não requer planejamento de partições de dados. O método read() permite ler dados e planejar o último deslocamento ao mesmo tempo.

Como SimpleDataSourceStreamReader lê registros no driver Spark para determinar o deslocamento final de cada lote sem particionamento, ele é adequado apenas para casos de uso leves onde a taxa de entrada e o tamanho dos lotes são pequenos. Use DataSourceStreamReader quando read Taxa de transferência é alta e não pode ser tratada por um único processo.

Adicionado no Databricks Runtime 15.3

Sintaxe

Python
from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}

def read(self, start):
...

def readBetweenOffsets(self, start, end):
...

Métodos

Método

Descrição

initialOffset()

Retorna o deslocamento inicial da transmissão fonte de dados. Uma nova consulta de transmissão começa a ler este deslocamento.

read(start)

Lê todos os dados disponíveis a partir do deslocamento inicial e retorna uma tupla contendo um iterador de registros e o deslocamento final para a próxima tentativa de leitura.

readBetweenOffsets(start, end)

Lê todos os dados disponíveis entre os deslocamentos inicial e final específicos. Invocado durante a recuperação de falhas para reler lotes de forma determinística.

commit(end)

Informa a fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a end.

Exemplos

Defina um leitor de fonte de dados de transmissão simplificado personalizado:

Python
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"

def schema(self):
return "value STRING"

def simpleStreamReader(self, schema):
return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}

def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end

def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()

def commit(self, end):
pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()