PySpark fonte de dados personalizada

Visualização

PySpark As fontes de dados personalizadas estão em Public Preview em Databricks Runtime 15.2 e acima. O suporte à transmissão está disponível em Databricks Runtime 15.3 e acima.

Um DataSource PySpark é criado pelo DataSourcePython (PySpark) API, que permite a leitura de fontes de dados personalizadas e a gravação em coletores de dados personalizados em Apache Spark usando Python. O senhor pode usar a fonte de dados personalizada PySpark para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais, para criar fontes de dados reutilizáveis.

Classe DataSource

O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.

Implementar a subclasse da fonte de dados

Dependendo do seu caso de uso, os itens a seguir devem ser implementados por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambos:

Propriedade ou método

Descrição

name

Necessário. O nome da fonte de dados

schema

Necessário. O esquema da fonte de dados a ser lida ou gravada

reader()

Deve retornar um DataSourceReader para tornar a fonte de dados legível (lotes)

writer()

Deve retornar um DataSourceWriter para tornar o coletor de dados gravável (lotes)

streamReader() ou simpleStreamReader()

Deve retornar um DataSourceStreamReader para tornar a transmissão de dados legível (transmissão)

streamWriter()

Deve retornar um DataSourceStreamWriter para tornar a transmissão de dados gravável (transmissão)

Observação

Os DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter definidos pelo usuário e seus métodos devem poder ser serializados. Em outras palavras, eles devem ser um dicionário ou um dicionário aninhado que contenha um tipo primitivo.

registro da fonte de dados

Depois de implementar a interface, o senhor deve registrá-la e, em seguida, pode carregá-la ou usá-la de outra forma, conforme mostrado no exemplo a seguir:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exemplo 1: Criar um DataSource PySpark para a consulta lotes

Para demonstrar os recursos de leitura do PySpark DataSource, crie uma fonte de dados que gere o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.

Instale o pacote faker usando o seguinte comando:

%pip install faker

o passo 1: Definir o DataSource de exemplo

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, um esquema e um leitor. O método reader() deve ser definido para ler de uma fonte de dados em uma consulta de lotes.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

o passo 2: Implementar o leitor para uma consulta de lotes

Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca faker instalada para preencher cada campo do esquema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

o passo 3: registro e uso do exemplo fonte de dados

Para usar a fonte de dados, registre-a. Em default, o FakeDataSource tem três linhas e o esquema inclui esses campos string: name, date, zipcode, state. O exemplo a seguir registra, carrega e gera a fonte de dados de exemplo com o padrão:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Somente os campos string são suportados, mas o senhor pode especificar um esquema com quaisquer campos que correspondam aos campos dos provedores de pacotes faker para gerar dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com os campos name e company:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Para carregar a fonte de dados com um número personalizado de linhas, especifique a opção numRows. O exemplo a seguir especifica 5 linhas:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Exemplo 2: Criar PySpark DataSource para leitura e gravação de transmissão

Para demonstrar os recursos de leitura e gravação da transmissão do PySpark DataSource, crie um exemplo de fonte de dados que gere duas linhas em cada microlote usando o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.

Instale o pacote faker usando o seguinte comando:

%pip install faker

o passo 1: Definir o DataSource de exemplo

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, um esquema e métodos streamReader() e streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

o passo 2: Implementar o leitor de transmissão

Em seguida, implemente o exemplo de leitor de dados de transmissão que gera duas linhas em cada microbatch. O senhor pode implementar DataSourceStreamReader ou, se a fonte de dados tiver uma taxa de transferência baixa e não exigir particionamento, pode implementar SimpleDataSourceStreamReader. simpleStreamReader() ou streamReader() devem ser implementados, e simpleStreamReader() só é invocado quando streamReader() não é implementado.

Implementação do DataSourceStreamReader

A instância streamReader tem um deslocamento inteiro que aumenta em 2 a cada microbatch, implementado com a interface DataSourceStreamReader.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementação do SimpleDataSourceStreamReader

A instância SimpleStreamReader é igual à instância FakeStreamReader que gera duas linhas em cada lote, mas implementada com a interface SimpleDataSourceStreamReader sem particionamento.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

o passo 3: Implementar o script de transmissão

Agora, implemente o escritor de transmissão. Esse gravador de dados de transmissão grava as informações de metadados de cada microbatch em um caminho local.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

o passo 4: registro e uso do exemplo fonte de dados

Para usar a fonte de dados, registre-a. Depois de registrado, o senhor pode usá-lo em consultas de transmissão como fonte ou coletor, passando um nome curto ou nome completo para format(). O exemplo a seguir registra a fonte de dados e, em seguida, inicia uma consulta que lê a fonte de dados do exemplo e gera uma saída para o console:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Como alternativa, o exemplo a seguir usa a transmissão de exemplo como um coletor e especifica um caminho de saída:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Solução de problemas

Se o resultado for o seguinte erro, o site compute não é compatível com a fonte de dados personalizada PySpark. O senhor deve usar o site Databricks Runtime 15.2 ou o acima.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000