Pular para o conteúdo principal

PySpark fonte de dados personalizada

Fontes de dados personalizadas PySpark são criadas usando a APIDataSourcePython (PySpark), que permite a leitura de fontes de dados personalizadas e a gravação em coletores de dados personalizados no Apache Spark usando Python. Você pode usar fontes de dados personalizadas PySpark para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais para construir fontes de dados reutilizáveis.

nota

PySpark A fonte de dados personalizada requer Databricks Runtime 15.4 LTS e acima, ou serverless environment version 2.

Classe DataSource

O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.

Implementar a subclasse da fonte de dados

Dependendo do seu caso de uso, os itens a seguir devem ser implementados por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambos:

Propriedade ou método

Descrição

name

Obrigatório. O nome da fonte de dados

schema

Obrigatório. O esquema da fonte de dados a ser lida ou gravada

reader()

Deve retornar um DataSourceReader para tornar a fonte de dados legível (lotes)

writer()

Deve retornar um DataSourceWriter para tornar o coletor de dados gravável (lotes)

streamReader() ou simpleStreamReader()

Deve retornar um DataSourceStreamReader para tornar a transmissão de dados legível (transmissão)

streamWriter()

Deve retornar um DataSourceStreamWriter para tornar a transmissão de dados gravável (transmissão)

nota

Os DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter definidos pelo usuário e seus métodos devem ser serializáveis. Em outras palavras, devem ser um dicionário ou dicionário aninhado que contenha um tipo primitivo.

registro da fonte de dados

Depois de implementar a interface, o senhor deve registrá-la e, em seguida, pode carregá-la ou usá-la de outra forma, conforme mostrado no exemplo a seguir:

Python
# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exemplo 1: Criar um DataSource PySpark para a consulta lotes

Para demonstrar os recursos de leitura do PySpark DataSource, crie uma fonte de dados que gere o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.

Instale o pacote faker usando o seguinte comando:

Python
%pip install faker

Etapa 1: definir o exemplo de fonte de dados

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, um esquema e um leitor. O método reader() deve ser definido para ler de uma fonte de dados em uma consulta de lotes.

Python
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""

@classmethod
def name(cls):
return "fake"

def schema(self):
return "name string, date string, zipcode string, state string"

def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)

Etapa 2: Implementar o leitor para uma consulta de lotes

Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca faker instalada para preencher cada campo do esquema.

Python
class FakeDataSourceReader(DataSourceReader):

def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options

def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()

# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)

Etapa 3: registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Em default, o FakeDataSource tem três linhas e o esquema inclui esses campos string: name, date, zipcode, state. O exemplo a seguir registra, carrega e gera a fonte de dados de exemplo com o padrão:

Python
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
Output
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+

Somente os campos string são compatíveis, mas o senhor pode especificar um esquema com quaisquer campos que correspondam aos campos dos provedores de pacotes faker para gerar dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com os campos name e company:

Python
spark.read.format("fake").schema("name string, company string").load().show()
Output
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+

Para carregar a fonte de dados com um número personalizado de linhas, especifique a opção numRows. O exemplo a seguir especifica 5 linhas:

Python
spark.read.format("fake").option("numRows", 5).load().show()
Output
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+

Exemplo 2: Criar um PySpark GitHub DataSource usando variantes

Para demonstrar o uso de variantes em um DataSource PySpark, este exemplo cria uma fonte de dados que lê solicitações pull de GitHub.

nota

As variantes são compatíveis com a fonte de dados personalizada PySpark em Databricks Runtime 17.1 e acima.

Para obter informações sobre variantes, consulte Consultar dados de variantes.

Etapa 1: Definir o GitHub DataSource

Primeiro, defina seu novo PySpark GitHub DataSource como uma subclasse de DataSource com um nome, um esquema e um método reader(). O esquema inclui esses campos: id, title, user, created_at, updated_at. O campo user é definido como uma variante.

Python
import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)

Etapa 2: Implementar o leitor para recuperar solicitações pull

Em seguida, implemente a lógica do leitor para recuperar pull requests do repositório GitHub especificado.

Python
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")

def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url)
response.raise_for_status()
prs = response.json()
for pr in prs:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)

Etapa 3: registrar e usar a fonte de dados

Para usar a fonte de dados, registre-a. O exemplo a seguir registra e, em seguida, carrega a fonte de dados e gera três linhas de dados do repositório GitHub PR :

Python
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("github_variant").option("numRows", 3).load("apache/spark").display()
Output
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+

Exemplo 3: Criar PySpark DataSource para leitura e gravação de transmissão

Para demonstrar os recursos de leitura e gravação da transmissão do PySpark DataSource, crie um exemplo de fonte de dados que gere duas linhas em cada micro-lote usando o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.

Instale o pacote faker usando o seguinte comando:

Python
%pip install faker

Etapa 1: definir o exemplo de fonte de dados

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, um esquema e métodos streamReader() e streamWriter().

Python
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""

@classmethod
def name(cls):
return "fakestream"

def schema(self):
return "name string, state string"

def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)

# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()

def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)

Etapa 2: Implementar o leitor de transmissão

Em seguida, implemente o exemplo de leitor de dados de transmissão que gera duas linhas em cada microbatch. O senhor pode implementar DataSourceStreamReader ou, se a fonte de dados tiver baixa taxa de transferência e não exigir particionamento, pode implementar SimpleDataSourceStreamReader. simpleStreamReader() ou streamReader() devem ser implementados, e simpleStreamReader() só é invocado quando streamReader() não está implementado.

Implementação do DataSourceStreamReader

A instância streamReader tem um deslocamento inteiro que aumenta em 2 em cada microlote, implementado com a interface DataSourceStreamReader.

Python
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end

class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0

def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}

def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}

def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]

def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass

def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))

Implementação simples do DataSourceStreamReader

A instância SimpleStreamReader é igual à instância FakeStreamReader que gera duas linhas em cada lote, mas implementada com a interface SimpleDataSourceStreamReader sem particionamento.

Python
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}

def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})

def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])

def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass

Etapa 3: Implementar o transmissor

Agora, implemente o escritor de transmissão. Esse gravador de dados de transmissão grava as informações de metadados de cada microbatch em um caminho local.

Python
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None

def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)

def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")

def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")

Etapa 4: registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Após o registro, você pode usá-lo em consultas de transmissão como fonte ou destino, passando um nome curto ou nome completo para format(). O exemplo a seguir registra a fonte de dados e, em seguida, inicia uma consulta que lê o exemplo de fonte de dados e envia para o console:

Python
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Como alternativa, o exemplo a seguir usa a transmissão de exemplo como um coletor e especifica um caminho de saída:

Python
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Solução de problemas

Se o resultado for o seguinte erro, o site compute não é compatível com a fonte de dados personalizada PySpark. O senhor deve usar o site Databricks Runtime 15.2 ou o acima.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000