Leitor de fluxo de fonte de dados simples
Uma classe base para leitores simplificados de transmissão de fontes de dados.
Em comparação com DataSourceStreamReader, SimpleDataSourceStreamReader não requer planejamento de partições de dados. O método read() permite ler dados e planejar o último deslocamento ao mesmo tempo.
Como SimpleDataSourceStreamReader lê registros no driver Spark para determinar o deslocamento final de cada lote sem particionamento, ele é adequado apenas para casos de uso leves onde a taxa de entrada e o tamanho dos lotes são pequenos. Use DataSourceStreamReader quando read Taxa de transferência é alta e não pode ser tratada por um único processo.
Adicionado no Databricks Runtime 15.3
Sintaxe
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Métodos
Método | Descrição |
|---|---|
Retorna o deslocamento inicial da transmissão fonte de dados. Uma nova consulta de transmissão começa a ler este deslocamento. | |
Lê todos os dados disponíveis a partir do deslocamento inicial e retorna uma tupla contendo um iterador de registros e o deslocamento final para a próxima tentativa de leitura. | |
Lê todos os dados disponíveis entre os deslocamentos inicial e final específicos. Invocado durante a recuperação de falhas para reler lotes de forma determinística. | |
Informa a fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a |
Exemplos
Defina um leitor de fonte de dados de transmissão simplificado personalizado:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()