メインコンテンツまでスキップ

PySpark カスタムデータソース

備考

プレビュー

PySparkカスタムデータソースは、Databricks Runtime15.2 以降、およびサーバレス環境バージョン 2 で パブリックプレビュー 段階にあります。ストリーミングのサポートは、Databricks Runtime 15.3 以降で利用できます。

PySpark データソースは、 Python (PySpark) データソース API によって作成され、Python を使用してカスタム データ ソースからの読み取りと Apache Spark のカスタム データ シンクへの書き込みが可能になります。 PySpark カスタムデータソースを使用して、データシステムへのカスタム接続を定義し、追加機能を実装して、再利用可能なデータソースを構築できます。

DataSource クラス

PySpark DataSource は、データ リーダーとライターを作成するためのメソッドを提供する基本クラスです。

データソースサブクラスを実装します

ユースケースに応じて、データソースを読み取り可能、書き込み可能、またはその両方にするために、任意のサブクラスで次のものを実装する必要があります。

プロパティまたはメソッド

説明

name

必須。 データソースの名前

schema

必須。 読み取りまたは書き込みを行うデータソースのスキーマ

reader()

データソースを読み取れるようにするための DataSourceReader を返す必要があります (バッチ)

writer()

データ シンクを書き込み可能にするための DataSourceWriter を返す必要があります (バッチ)

streamReader() または simpleStreamReader()

データ ストリームを読み取り可能にするために DataSourceStreamReader を返す必要があります (ストリーミング)

streamWriter()

データ ストリームを書き込み可能にするには、 DataSourceStreamWriter を返す必要があります (ストリーミング)

注記

ユーザー定義の DataSourceDataSourceReaderDataSourceWriterDataSourceStreamReaderDataSourceStreamWriter、およびそれらのメソッドはシリアル化できる必要があります。 つまり、プリミティブ型を含むディクショナリまたはネストされたディクショナリである必要があります。

データソースを登録する

インターフェイスを実装したら、登録する必要があり、次の例に示すように、インターフェイスをロードまたは使用できます。

Python
# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

例1:バッチクエリ用のPySparkデータソースを作成する

PySpark DataSource リーダー機能を示すには、faker Python パッケージを使用してサンプル データを生成するデータソースを作成します。fakerの詳細については、Faker のドキュメントを参照してください。

次のコマンドを使用して、 faker パッケージをインストールします。

Python
%pip install faker

ステップ1:サンプルデータソースを定義する

まず、新しいPySparkデータソースを、名前、スキーマ、およびリーダーを持つ DataSource のサブクラスとして定義します。 reader() メソッドは、バッチ クエリでデータソースから読み取るように定義する必要があります。

Python
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""

@classmethod
def name(cls):
return "fake"

def schema(self):
return "name string, date string, zipcode string, state string"

def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)

手順 2: バッチ クエリのリーダーを実装する

次に、サンプルデータを生成するためのリーダーロジックを実装します。 インストールされている faker ライブラリを使用して、スキーマの各フィールドを設定します。

Python
class FakeDataSourceReader(DataSourceReader):

def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options

def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()

# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)

ステップ 3: 登録する サンプルデータソースを使用する

データソースを使用するには、登録する必要があります。デフォルトでは、 FakeDataSourceには 3 つの行があり、スキーマには次のstringフィールドが含まれます: namedatezipcodestate 。 次の例では、デフォルトを使用して例のデータソースを登録し、ロードして出力します。

Python
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
Output
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+

stringフィールドのみがサポートされていますが、fakerパッケージプロバイダーのフィールドに対応する任意のフィールドを使用してスキーマを指定して、テストおよび開発用のランダムデータを生成できます。次の例では、 name フィールドと company フィールドを含むデータソースを読み込みます。

Python
spark.read.format("fake").schema("name string, company string").load().show()
Output
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+

カスタムの行数でデータソースをロードするには、 numRows オプションを指定します。 次の例では、5 つの行を指定しています。

Python
spark.read.format("fake").option("numRows", 5).load().show()
Output
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+

例 2: ストリーミングの読み取りと書き込み用の PySpark データソースを作成する

PySpark DataSource ストリーム リーダーおよびライター機能を示すために、faker Python パッケージを使用して各マイクロバッチで 2 行を生成するサンプル データソースを作成します。fakerの詳細については、Faker のドキュメントを参照してください。

次のコマンドを使用して、 faker パッケージをインストールします。

Python
%pip install faker

ステップ1:サンプルデータソースを定義する

まず、新しいPySparkデータソースを、名前、スキーマ、およびメソッドstreamReader()およびstreamWriter()を持つDataSourceのサブクラスとして定義します。

Python
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""

@classmethod
def name(cls):
return "fakestream"

def schema(self):
return "name string, state string"

def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)

# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()

def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)

ステップ 2: ストリーム リーダーを実装する

次に、各マイクロバッチで 2 行を生成するストリーミング データ リーダーの例を実装します。 DataSourceStreamReaderを実装することもできますが、データソースのスループットが低く、パーティショニングが必要ない場合は、代わりに SimpleDataSourceStreamReader を実装できます。simpleStreamReader() または streamReader() を実装する必要があり、simpleStreamReader()streamReader() が実装されていない場合にのみ呼び出されます。

DataSourceStreamReader の実装

streamReader インスタンスには、マイクロバッチごとに 2 ずつ増加する整数オフセットがあり、DataSourceStreamReader インターフェイスで実装されます。

Python
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end

class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0

def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}

def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}

def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]

def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass

def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))

SimpleDataSourceStreamReader の実装

SimpleStreamReader インスタンスは、各バッチで 2 つの行を生成する FakeStreamReader インスタンスと同じですが、パーティション分割なしで SimpleDataSourceStreamReader インターフェイスで実装されます。

Python
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}

def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})

def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])

def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass

ステップ 3: ストリームライターを実装する

次に、ストリーミングライターを実装します。 このストリーミング データ ライターは、各マイクロバッチのメタデータ情報をローカル パスに書き込みます。

Python
class SimpleCommitMessage:
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None

def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)

def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")

def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")

ステップ 4: 登録する サンプルデータソースを使用する

データソースを使用するには、登録する必要があります。登録後、短い名前または完全な名前をformat()に渡すことで、ストリーミング クエリでソースまたはシンクとして使用できます。 次の例では、データソースを登録し、サンプルのデータソースから読み取り、コンソールに出力するクエリを開始します。

Python
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

または、次の例では、サンプルストリームをシンクとして使用し、出力パスを指定します。

Python
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

トラブルシューティング

出力が次のエラーの場合、コンピュートは PySpark カスタム データソースをサポートしていません。 Databricks Runtime 15.2 以降を使用する必要があります。

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000