メインコンテンツまでスキップ

データソースストリームリーダー

ストリーミング データ ソース リーダーの基本クラス。

データ ソース ストリーム リーダーは、ストリーミング データ ソースからデータを出力する責任があります。 このクラスを実装し、 DataSource.streamReader()からインスタンスを返し、データ ソースをストリーミング ソースとして読み取り可能にします。

構文

Python
from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...

def partitions(self, start, end):
...

def read(self, partition):
...

方法

手法

説明

initialOffset()

ストリーミング データソースの初期オフセットを辞書として返します。 新しいストリーミング クエリは、このオフセットから読み取りを開始します。再開されたクエリは、代わりにチェックポイントされたオフセットから再開されます。

partitions(start, end)

startendオフセット間のデータを表すInputPartitionオブジェクトのシーケンスを返します。start endと等しい場合は空のシーケンスを返します。

read(partition)

指定されたパーティションのデータを生成し、タプル、行、または PyArrow RecordBatchオブジェクトの反復子を返します。各タプルまたは行は、最終的な DataFrame の行に変換されます。このメソッドは抽象的であり、実装する必要があります。

commit(end)

Spark がend以下のオフセットのすべてのデータの処理を完了したことをソースに通知します。今後、Spark はendより大きいオフセットのみを要求します。

stop()

ソースを停止し、割り当てられているリソースを解放します。ストリーミング クエリが終了したときに呼び出されます。

注意

  • read() 静的かつステートレスです。read()の異なる呼び出し間で、変更可能なクラス メンバーにアクセスしたり、メモリ内の状態を保持したりしないでください。
  • partitions()によって返されるすべてのパーティション値は、pickle 化可能なオブジェクトである必要があります。
  • オフセットは、キーと値が整数、文字列、またはブール値などのプリミティブ型である辞書または再帰辞書として表されます。

インデックス付きレコードのシーケンスから読み取るストリーミング リーダーを実装します。

Python
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}

def latestOffset(self, start, limit):
return {"index": start["index"] + 10}

def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]

def read(self, partition):
yield (partition.value, f"record-{partition.value}")

def commit(self, end):
print(f"Committed up to offset {end}")

def stop(self):
print("Stopping stream reader")