メインコンテンツまでスキップ

PySpark カスタムデータソース

PySparkカスタム データ ソースは、 Python ( PySpark ) DataSource APIを使用して作成されます。これにより、 Pythonを使用してApache Sparkでカスタム データ ソースからの読み取りとカスタム データ シンクへの書き込みが可能になります。 PySparkカスタム データ ソースを使用して、データ システムへのカスタム接続を定義し、追加機能を実装して再利用可能なデータ ソースを構築できます。

注記

カスタムデータソースPySparkには、Databricks Runtime 15.4 LTS以降、またはサーバレス環境バージョン 2 が必要です。

DataSource クラス

PySpark DataSource は、データ リーダーとライターを作成するためのメソッドを提供する基本クラスです。

データソースサブクラスを実装します

ユースケースに応じて、データソースを読み取り可能、書き込み可能、またはその両方にするために、任意のサブクラスで次のものを実装する必要があります。

プロパティまたはメソッド

説明

name

必須。 データソースの名前

schema

必須。 読み取りまたは書き込みを行うデータソースのスキーマ

reader()

データソースを読み取れるようにするための DataSourceReader を返す必要があります (バッチ)

writer()

データ シンクを書き込み可能にするための DataSourceWriter を返す必要があります (バッチ)

streamReader() または simpleStreamReader()

データ ストリームを読み取り可能にするために DataSourceStreamReader を返す必要があります (ストリーミング)

streamWriter()

データ ストリームを書き込み可能にするには、 DataSourceStreamWriter を返す必要があります (ストリーミング)

注記

ユーザー定義のDataSourceDataSourceReaderDataSourceWriterDataSourceStreamReaderDataSourceStreamWriter 、およびそれらのメソッドはシリアル化可能である必要があります。つまり、プリミティブ型を含む辞書またはネストされた辞書である必要があります。

データソースを登録する

インターフェイスを実装したら、登録する必要があり、次の例に示すように、インターフェイスをロードまたは使用できます。

Python
# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

例1:バッチクエリ用のPySparkデータソースを作成する

PySpark DataSource リーダー機能を示すには、faker Python パッケージを使用してサンプル データを生成するデータソースを作成します。fakerの詳細については、Faker のドキュメントを参照してください。

次のコマンドを使用して、 faker パッケージをインストールします。

Python
%pip install faker

ステップ 1: バッチ クエリのリーダーを実装する

まず、サンプルデータを生成するためのリーダーロジックを実装します。インストール済みのfakerライブラリを使用して、スキーマ内の各フィールドに値を設定します。

Python
class FakeDataSourceReader(DataSourceReader):

def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options

def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()

# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)

ステップ 2: サンプルの DataSource を定義する

次に、新しい PySpark DataSource を、名前、スキーマ、およびリーダーを指定してDataSourceのサブクラスとして定義します。バッチクエリでデータソースから読み込むには、 reader()メソッドを定義する必要があります。

Python
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""

@classmethod
def name(cls):
return "fake"

def schema(self):
return "name string, date string, zipcode string, state string"

def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)

ステップ 3: 登録する サンプルデータソースを使用する

データソースを使用するには、登録する必要があります。デフォルトでは、 FakeDataSourceには 3 つの行があり、スキーマには次のstringフィールドが含まれます: namedatezipcodestate 。 次の例では、デフォルトを使用して例のデータソースを登録し、ロードして出力します。

Python
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
Output
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+

stringフィールドのみがサポートされていますが、fakerパッケージプロバイダーのフィールドに対応する任意のフィールドを使用してスキーマを指定して、テストおよび開発用のランダムデータを生成できます。次の例では、 name フィールドと company フィールドを含むデータソースを読み込みます。

Python
spark.read.format("fake").schema("name string, company string").load().show()
Output
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+

カスタムの行数でデータソースをロードするには、 numRows オプションを指定します。 次の例では、5 つの行を指定しています。

Python
spark.read.format("fake").option("numRows", 5).load().show()
Output
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+

例 2: バリアントを使用して PySpark GitHub データソースを作成する

PySpark データソースでのバリアントの使用を示すために、この例では、GitHub からプル リクエストを読み取るデータ ソースを作成します。

注記

バリアントは、Databricks Runtime 17.1以降のPySparkカスタムデータソースでサポートされています。

バリアントに関する情報については、「 バリアント データのクエリ」を参照してください。

ステップ 1: プル リクエストを取得するリーダーを実装する

まず、指定されたGitHubリポジトリからプル リクエストを取得するリーダー ロジックを実装します。

Python
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))

def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)

ステップ 2: GitHubデータソースを定義する

次に、新しい PySpark GitHub DataSource を、名前、スキーマ、メソッドreader()を持つDataSourceのサブクラスとして定義します。スキーマには、 idtitleusercreated_atupdated_atのフィールドが含まれます。userフィールドはバリアントとして定義されています。

Python
import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)

ステップ3: 登録する and use the データソース

データソースを使用するには、登録する。 次の例では、登録する、データソースをロードし、データPRGitHubリポジトリの 3 行を出力します。

Python
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
Output
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+

例 3: ストリーミング読み取りおよび書き込み用の PySpark DataSource の作成

PySpark DataSource ストリーム リーダーおよびライター機能を示すために、faker Python パッケージを使用して各マイクロバッチで 2 行を生成するサンプル データソースを作成します。fakerの詳細については、Faker のドキュメントを参照してください。

次のコマンドを使用して、 faker パッケージをインストールします。

Python
%pip install faker

ステップ 1: ストリーム リーダーを実装する

まず、マイクロバッチごとに2行を生成するストリーミングデータリーダーのサンプルを実装します。DataSourceStreamReader実装することもできますし、データソースのスループットが低くパーティショニングが不要な場合は、代わりにSimpleDataSourceStreamReaderを実装することもできます。simpleStreamReader()またはstreamReader()いずれかを実装する必要があり、 simpleStreamReader()streamReader()が実装されていない場合にのみ呼び出されます。

DataSourceStreamReader の実装

streamReader インスタンスには、マイクロバッチごとに 2 ずつ増加する整数オフセットがあり、DataSourceStreamReader インターフェイスで実装されます。

Python
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end

class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0

def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}

def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}

def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]

def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass

def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))

SimpleDataSourceStreamReader の実装

SimpleStreamReader インスタンスは、各バッチで 2 つの行を生成する FakeStreamReader インスタンスと同じですが、パーティション分割なしで SimpleDataSourceStreamReader インターフェイスで実装されます。

Python
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}

def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})

def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])

def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass

ステップ 2: ストリーム ライターを実装する

次に、ストリーミングライターを実装します。このストリーミングデータライターは、各マイクロバッチのメタデータ情報をローカルパスに書き込みます。

Python
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None

def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)

def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")

def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")

ステップ 3: サンプルの DataSource を定義する

次に、新しい PySpark DataSource を、名前、スキーマ、メソッドstreamReader()およびstreamWriter()を持つDataSourceのサブクラスとして定義します。

Python
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""

@classmethod
def name(cls):
return "fakestream"

def schema(self):
return "name string, state string"

def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)

# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()

def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)

ステップ 4: 登録する サンプルデータソースを使用する

データソースを利用するには、データソースを登録します。 登録後は、短い名前または完全な名前をformat()に渡すことで、ストリーミング クエリでソースまたはシンクとして使用できます。次の例では、データ ソースを登録し、サンプル データ ソースから読み取り、コンソールに出力するクエリを開始します。

Python
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

あるいは、以下のコードはサンプルストリームをシンクとして使用し、出力パスを指定します。

Python
spark.dataSource.register(FakeStreamDataSource)

# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"

query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)

例 4: Google BigQueryストリーミング コネクタを作成する

以下の例は、PySpark DataSourceを使用してGoogle BigQuery(BQ)用のカスタムストリーミングコネクタを構築する方法を示しています。Databricks BigQueryバッチ取り込み用のSparkコネクタを提供し、レイクハウスフェデレーションは任意のBigQueryデータ セットにリモートで接続し、フォーリンカタログ作成を通じてデータをプルすることもできますが、どちらも増分または継続的なストリーミング ワークフローを完全にはサポートしていません。 このコネクタを使用すると、段階的な増分データ移行と、永続的なチェックポイント機能を備えたストリーミングソースから供給されるBigQueryテーブルからのほぼリアルタイムの移行が可能になります。

このカスタムコネクタには、以下の特徴があります。

  • 構造化ストリーミング、 LakeFlow Spark宣言型パイプラインに対応。
  • 増分レコード追跡と継続的なストリーミング取り込みをサポートし、構造化ストリーミングのセマンティクスに準拠しています。
  • RPCベースのプロトコルを使用してBigQuery Storage APIを利用することで、より高速かつ低コストなデータ転送を実現します。
  • 移行されたテーブルをUnity Catalogに直接書き込みます。
  • 日付またはタイムスタンプに基づく増分フィールドを使用して、チェックポイントを自動的に管理します。
  • Trigger.AvailableNow()を使用したバッチ取り込みをサポートします。
  • 中間的なクラウドストレージは不要です。
  • BigQueryのデータ送信をArrowまたはAvro形式を使用してシリアル化します。
  • 自動並列処理を処理し、 Spark全体に作業を分散します データ量に基づいて。
  • BigQueryからの Raw およびブロンズレイヤーの移行に適しており、 SCDタイプ 1 またはタイプ 2 パターンを使用したシルバーおよびゴールドレイヤーの移行がサポートされています。

前提条件

カスタムコネクタを実装する前に、必要なパッケージをインストールしてください。

%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage

ステップ 1: ストリーム リーダーを実装する

まず、ストリーミングデータリーダーを実装します。DataSourceStreamReaderサブクラスは、以下のメソッドを実装する必要があります。

  • initialOffset(self) -> dict
  • latestOffset(self) -> dict
  • partitions(self, start: dict, end: dict) -> Sequence[InputPartition]
  • read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]
  • commit(self, end: dict) -> None
  • stop(self) -> None

各メソッドの詳細については、 「メソッド」を参照してください。

Python
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging

start_time = time.time()


class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx


class BQStreamReader(DataSourceStreamReader):

def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")

self.last_offset = None

def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'

return {"offset": str(self.last_offset)}

def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery

if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'

client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]

if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}

return {"offset": str(self.last_offset)}

def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:

"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file

# project_id = self.auth_project_id

client = BigQueryReadClient()

# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"

# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO

parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]

def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []

for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)

def commit(self, end):

"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass

ステップ 2: データソースを定義する

次に、カスタムデータソースを定義します。DataSourceサブクラスは、以下のメソッドを実装する必要があります。

  • name(cls) -> str
  • schema(self) -> Union[StructType, str]

各メソッドの詳細については、 「メソッド」を参照してください。

Python
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery

class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""

@classmethod
def name(cls):
return "bigquery-streaming"

def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)

return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"

def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)

ステップ 3: ストリーミング クエリを設定して開始する

最後に、コネクタを登録し、ストリーミング クエリを構成して開始します。

Python
spark.dataSource.register(BQStreamDataSource)

# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.

query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)

(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)

執行命令

カスタムストリームの関数実行順序は以下のとおりです。

Spark DataFrameロードするには:

Python
name(cls)
schema()

新規クエリの開始時、または既存クエリの再開時(新規または既存のチェックポイント)のマイクロバッチ(n)の場合:

Python
partitions(end_offset, end_offset)  # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()

既存のチェックポイントで実行中のクエリの次の(n+1)マイクロバッチの場合:

Python
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
注記

latestOffset関数はチェックポイント処理を統括します。プリミティブ型のチェックポイント変数を複数の関数間で共有し、それを辞書として返します。例えば: return {"offset": str(self.last_offset)}

トラブルシューティング

出力が次のエラーの場合、コンピュートは PySpark カスタム データソースをサポートしていません。 Databricks Runtime 15.2 以降を使用する必要があります。

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000