PySpark カスタムデータソース
プレビュー
PySparkカスタムデータソースは、Databricks Runtime15.2 以降、およびサーバレス環境バージョン 2 で パブリックプレビュー 段階にあります。ストリーミングのサポートは、Databricks Runtime 15.3 以降で利用できます。
PySpark データソースは、 Python (PySpark) データソース API によって作成され、Python を使用してカスタム データ ソースからの読み取りと Apache Spark のカスタム データ シンクへの書き込みが可能になります。 PySpark カスタムデータソースを使用して、データシステムへのカスタム接続を定義し、追加機能を実装して、再利用可能なデータソースを構築できます。
DataSource クラス
PySpark DataSource は、データ リーダーとライターを作成するためのメソッドを提供する基本クラスです。
データソースサブクラスを実装します
ユースケースに応じて、データソースを読み取り可能、書き込み可能、またはその両方にするために、任意のサブクラスで次のものを実装する必要があります。
プロパティまたはメソッド | 説明 |
---|---|
| 必須。 データソースの名前 |
| 必須。 読み取りまたは書き込みを行うデータソースのスキーマ |
| データソースを読み取れるようにするための |
| データ シンクを書き込み可能にするための |
| データ ストリームを読み取り可能にするために |
| データ ストリームを書き込み可能にするには、 |
ユーザー定義の DataSource
、 DataSourceReader
、 DataSourceWriter
、 DataSourceStreamReader
、 DataSourceStreamWriter
、およびそれらのメソッドはシリアル化できる必要があります。 つまり、プリミティブ型を含むディクショナリまたはネストされたディクショナリである必要があります。
データソースを登録する
インターフェイスを実装したら、登録する必要があり、次の例に示すように、インターフェイスをロードまたは使用できます。
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
例1:バッチクエリ用のPySparkデータソースを作成する
PySpark DataSource リーダー機能を示すには、faker
Python パッケージを使用してサンプル データを生成するデータソースを作成します。faker
の詳細については、Faker のドキュメントを参照してください。
次のコマンドを使用して、 faker
パッケージをインストールします。
%pip install faker
ステップ1:サンプルデータソースを定義する
まず、新しいPySparkデータソースを、名前、スキーマ、およびリーダーを持つ DataSource
のサブクラスとして定義します。 reader()
メソッドは、バッチ クエリでデータソースから読み取るように定義する必要があります。
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
手順 2: バッチ クエリのリーダーを実装する
次に、サンプルデータを生成するためのリーダーロジックを実装します。 インストールされている faker
ライブラリを使用して、スキーマの各フィールドを設定します。
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
ステップ 3: 登録する サンプルデータソースを使用する
データソースを使用するには、登録する必要があります。デフォルトでは、 FakeDataSource
には 3 つの行があり、スキーマには次のstring
フィールドが含まれます: name
、 date
、 zipcode
、 state
。 次の例では、デフォルトを使用して例のデータソースを登録し、ロードして出力します。
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
string
フィールドのみがサポートされていますが、faker
パッケージプロバイダーのフィールドに対応する任意のフィールドを使用してスキーマを指定して、テストおよび開発用のランダムデータを生成できます。次の例では、 name
フィールドと company
フィールドを含むデータソースを読み込みます。
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
カスタムの行数でデータソースをロードするには、 numRows
オプションを指定します。 次の例では、5 つの行を指定しています。
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
例 2: ストリーミングの読み取りと書き込み用の PySpark データソースを作成する
PySpark DataSource ストリーム リーダーおよびライター機能を示すために、faker
Python パッケージを使用して各マイクロバッチで 2 行を生成するサンプル データソースを作成します。faker
の詳細については、Faker のドキュメントを参照してください。
次のコマンドを使用して、 faker
パッケージをインストールします。
%pip install faker
ステップ1:サンプルデータソースを定義する
まず、新しいPySparkデータソースを、名前、スキーマ、およびメソッドstreamReader()
およびstreamWriter()
を持つDataSource
のサブクラスとして定義します。
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
ステップ 2: ストリーム リーダーを実装する
次に、各マイクロバッチで 2 行を生成するストリーミング データ リーダーの例を実装します。 DataSourceStreamReader
を実装することもできますが、データソースのスループットが低く、パーティショニングが必要ない場合は、代わりに SimpleDataSourceStreamReader
を実装できます。simpleStreamReader()
または streamReader()
を実装する必要があり、simpleStreamReader()
は streamReader()
が実装されていない場合にのみ呼び出されます。
DataSourceStreamReader の実装
streamReader
インスタンスには、マイクロバッチごとに 2 ずつ増加する整数オフセットがあり、DataSourceStreamReader
インターフェイスで実装されます。
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
SimpleDataSourceStreamReader の実装
SimpleStreamReader
インスタンスは、各バッチで 2 つの行を生成する FakeStreamReader
インスタンスと同じですが、パーティション分割なしで SimpleDataSourceStreamReader
インターフェイスで実装されます。
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
ステップ 3: ストリームライターを実装する
次に、ストリーミングライターを実装します。 このストリーミング データ ライターは、各マイクロバッチのメタデータ情報をローカル パスに書き込みます。
class SimpleCommitMessage:
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
ステップ 4: 登録する サンプルデータソースを使用する
データソースを使用するには、登録する必要があります。登録後、短い名前または完全な名前をformat()
に渡すことで、ストリーミング クエリでソースまたはシンクとして使用できます。 次の例では、データソースを登録し、サンプルのデータソースから読み取り、コンソールに出力するクエリを開始します。
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
または、次の例では、サンプルストリームをシンクとして使用し、出力パスを指定します。
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
トラブルシューティング
出力が次のエラーの場合、コンピュートは PySpark カスタム データソースをサポートしていません。 Databricks Runtime 15.2 以降を使用する必要があります。
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000