PySpark fonte de dados personalizada
Fontes de dados personalizadas PySpark são criadas usando a APIDataSourcePython (PySpark), que permite a leitura de fontes de dados personalizadas e a gravação em coletores de dados personalizados no Apache Spark usando Python. Você pode usar fontes de dados personalizadas PySpark para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais para construir fontes de dados reutilizáveis.
PySpark A fonte de dados personalizada requer Databricks Runtime 15.4 LTS e acima, ou serverless environment version 2.
Classe DataSource
O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.
Implementar a subclasse da fonte de dados
Dependendo do seu caso de uso, os itens a seguir devem ser implementados por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambos:
Propriedade ou método | Descrição |
|---|---|
| Obrigatório. O nome da fonte de dados |
| Obrigatório. O esquema da fonte de dados a ser lida ou gravada |
| Deve retornar um |
| Deve retornar um |
| Deve retornar um |
| Deve retornar um |
Os DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter definidos pelo usuário e seus métodos devem ser serializáveis. Em outras palavras, devem ser um dicionário ou dicionário aninhado que contenha um tipo primitivo.
registro da fonte de dados
Depois de implementar a interface, o senhor deve registrá-la e, em seguida, pode carregá-la ou usá-la de outra forma, conforme mostrado no exemplo a seguir:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Exemplo 1: Criar um DataSource PySpark para a consulta lotes
Para demonstrar os recursos de leitura do PySpark DataSource, crie uma fonte de dados que gere o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.
Instale o pacote faker usando o seguinte comando:
%pip install faker
o passo 1: Implementar o leitor para uma consulta de lotes
Primeiro, implemente a lógica de leitura para gerar dados de exemplo. Use a biblioteca faker instalada para preencher cada campo no esquema.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
o passo 2: Defina a fonte de dados de exemplo
Em seguida, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, esquema e leitor. O método reader() deve ser definido para ler de uma fonte de dados em uma consulta lotes.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Etapa 3: registrar e usar a fonte de dados de exemplo
Para usar a fonte de dados, registre-a. Em default, o FakeDataSource tem três linhas e o esquema inclui esses campos string: name, date, zipcode, state. O exemplo a seguir registra, carrega e gera a fonte de dados de exemplo com o padrão:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Somente os campos string são compatíveis, mas o senhor pode especificar um esquema com quaisquer campos que correspondam aos campos dos provedores de pacotes faker para gerar dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com os campos name e company:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Para carregar a fonte de dados com um número personalizado de linhas, especifique a opção numRows. O exemplo a seguir especifica 5 linhas:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Exemplo 2: Criar um PySpark GitHub DataSource usando variantes
Para demonstrar o uso de variantes em um DataSource PySpark, este exemplo cria uma fonte de dados que lê solicitações pull de GitHub.
As variantes são compatíveis com a fonte de dados personalizada PySpark em Databricks Runtime 17.1 e acima.
Para obter informações sobre variantes, consulte Consultar dados de variantes.
o passo 1: Implemente o leitor para recuperar solicitações de pull
Primeiro, implemente a lógica de leitura para recuperar solicitações de pull do repositório GitHub especificado.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
o passo 2: Defina a fonte de dados GitHub
Em seguida, defina seu novo PySpark GitHub DataSource como uma subclasse de DataSource com um nome, esquema e método reader(). O esquema inclui estes campos: id, title, user, created_at, updated_at. O campo user é definido como uma variante.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
Etapa 3: registrar e usar a fonte de dados
Para usar a fonte de dados, registre-a. O exemplo a seguir registra e, em seguida, carrega a fonte de dados e gera três linhas de dados do repositório GitHub PR :
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
Exemplo 3: Criar PySpark DataSource para leitura e gravação de transmissão
Para demonstrar os recursos de leitura e gravação da transmissão do PySpark DataSource, crie um exemplo de fonte de dados que gere duas linhas em cada micro-lote usando o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.
Instale o pacote faker usando o seguinte comando:
%pip install faker
o passo 1: Implementar o leitor de transmissão
Primeiro, implemente o leitor de dados de transmissão de exemplo que gera duas linhas em cada microlote. Você pode implementar DataSourceStreamReader, ou se a fonte de dados tiver baixa Taxa de transferência e não exigir particionamento, você pode implementar SimpleDataSourceStreamReader em vez disso. Ou simpleStreamReader() ou streamReader() deve ser implementado, e simpleStreamReader() só é invocado quando streamReader() não é implementado.
Implementação do DataSourceStreamReader
A instância streamReader tem um deslocamento inteiro que aumenta em 2 em cada microlote, implementado com a interface DataSourceStreamReader.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementação simples do DataSourceStreamReader
A instância SimpleStreamReader é igual à instância FakeStreamReader que gera duas linhas em cada lote, mas implementada com a interface SimpleDataSourceStreamReader sem particionamento.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
o passo 2: Implementar o escritor de transmissão
Em seguida, implemente o gravador de transmissão. Este gravador de dados de transmissão grava as informações de metadados de cada microlote em um caminho local.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
o passo 3: Defina a fonte de dados de exemplo
Agora defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, esquema e métodos streamReader() e streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Etapa 4: registrar e usar a fonte de dados de exemplo
Para usar a fonte de dados, registre-a. Após o registro, você pode usá-lo em consultas de transmissão como fonte ou destino, passando um nome curto ou nome completo para format(). O exemplo a seguir registra a fonte de dados e, em seguida, inicia uma consulta que lê o exemplo de fonte de dados e envia para o console:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Alternativamente, o código a seguir usa o exemplo de transmissão como um receptor e especifica um caminho de saída:
spark.dataSource.register(FakeStreamDataSource)
# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"
query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)
Exemplo 4: Crie um conector de transmissão do Google BigQuery
O exemplo a seguir demonstra como criar um conector de transmissão personalizado para o Google BigQuery (BQ) usando um DataSource PySpark . Databricks fornece um conectorSpark para ingestão de lotes BigQuery , e o Lakehouse Federation também pode se conectar remotamente a qualquer conjunto de dados BigQuery e extrair dados por meio da criação de catálogos externos, mas nenhum dos dois oferece suporte completo à transmissão incremental ou contínua de fluxo de trabalho. Este conector permite a migração incremental de dados por fases e a migração em tempo real quase simultânea de tabelas BigQuery alimentadas por fontes de transmissão com pontos de verificação persistentes.
Este conector personalizado possui o seguinte recurso:
- Compatível com transmissão estruturada e pipeline Declarativo LakeFlow Spark .
- Suporta envio incremental de registros e ingestão de transmissão contínua, e segue a semântica de transmissão estruturada.
- Utiliza a API de armazenamento do BigQuery com um protocolo baseado em RPC para transmissão de dados mais rápida e econômica.
- Grava as tabelas migradas diretamente no Unity Catalog.
- Gerencie pontos de verificação automaticamente usando um campo incremental baseado em data ou carimbo de data/hora.
- Suporta ingestão de lotes com
Trigger.AvailableNow(). - Não requer armazenamento intermediário cloud .
- Serializa a transmissão de dados do BigQuery usando o formato Arrow ou Avro.
- Gerencia o paralelismo automático e distribui o trabalho entre os workers Spark com base no volume de dados.
- Adequado para migração das camadas Raw e Bronze do BigQuery, com suporte para migrações das camadas Silver e Ouro usando padrões SCD Tipo 1 ou Tipo 2.
Pré-requisitos
Antes de implementar o conector personalizado, instale o pacote necessário:
%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage
o passo 1: Implementar o leitor de transmissão
Primeiro, implemente o leitor de dados de transmissão. A subclasse DataSourceStreamReader deve implementar os seguintes métodos:
initialOffset(self) -> dictlatestOffset(self) -> dictpartitions(self, start: dict, end: dict) -> Sequence[InputPartition]read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]commit(self, end: dict) -> Nonestop(self) -> None
Para obter detalhes sobre cada método, consulte Métodos.
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging
start_time = time.time()
class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx
class BQStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")
self.last_offset = None
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'
return {"offset": str(self.last_offset)}
def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery
if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'
client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]
if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}
return {"offset": str(self.last_offset)}
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
# project_id = self.auth_project_id
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"
# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO
parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]
def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []
for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
o passo 2: Definir o DataSource
A seguir, defina a fonte de dados personalizada. A subclasse DataSource deve implementar os seguintes métodos:
name(cls) -> strschema(self) -> Union[StructType, str]
Para obter detalhes sobre cada método, consulte Métodos.
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery
class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "bigquery-streaming"
def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)
return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"
def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)
o passo 3: Configurar e iniciar a consulta de transmissão
Por fim, registre o conector, configure e inicie a consulta de transmissão:
spark.dataSource.register(BQStreamDataSource)
# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.
query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)
(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)
Ordem de execução
A ordem de execução da função de transmissão personalizada está descrita abaixo.
Para carregar o DataFrame de transmissão Spark :
name(cls)
schema()
Para microbatch (n) de uma nova consulta começar ou ao reiniciar uma consulta existente (ponto de verificação novo ou existente):
partitions(end_offset, end_offset) # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()
Para o próximo microlote (n+1) de uma consulta em execução em um ponto de verificação existente:
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
A função latestOffset orquestra o checkpoint. Compartilhe uma variável de ponto de verificação de um tipo primitivo entre funções e retorne-a como um dicionário. Por exemplo: return {"offset": str(self.last_offset)}
Solução de problemas
Se o resultado for o seguinte erro, o site compute não é compatível com a fonte de dados personalizada PySpark. O senhor deve usar o site Databricks Runtime 15.2 ou o acima.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000