データソースストリームリーダー
ストリーミング データ ソース リーダーの基本クラス。
データ ソース ストリーム リーダーは、ストリーミング データ ソースからデータを出力する責任があります。 このクラスを実装し、 DataSource.streamReader()からインスタンスを返し、データ ソースをストリーミング ソースとして読み取り可能にします。
構文
Python
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
方法
手法 | 説明 |
|---|---|
ストリーミング データソースの初期オフセットを辞書として返します。 新しいストリーミング クエリは、このオフセットから読み取りを開始します。再開されたクエリは、代わりにチェックポイントされたオフセットから再開されます。 | |
| |
指定されたパーティションのデータを生成し、タプル、行、または PyArrow | |
Spark が | |
ソースを停止し、割り当てられているリソースを解放します。ストリーミング クエリが終了したときに呼び出されます。 |
注意
read()静的かつステートレスです。read()の異なる呼び出し間で、変更可能なクラス メンバーにアクセスしたり、メモリ内の状態を保持したりしないでください。partitions()によって返されるすべてのパーティション値は、pickle 化可能なオブジェクトである必要があります。- オフセットは、キーと値が整数、文字列、またはブール値などのプリミティブ型である辞書または再帰辞書として表されます。
例
インデックス付きレコードのシーケンスから読み取るストリーミング リーダーを実装します。
Python
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")