メインコンテンツまでスキップ

SimpleDataSourceStreamReader

ストリーミングデータソースリーダーを簡素化するための基本クラス。

DataSourceStreamReaderと比較して、 SimpleDataSourceStreamReader計画的なデータパーティションを必要としません。read()メソッドでは、データの読み取りと最新のオフセットの計画を同時に行うことができます。

SimpleDataSourceStreamReader 、Spark ドライバでレコードを読み込んで、パーティショニングなしで各バッチの終了オフセットを決定するため、入力レートとバッチサイズが小さい軽量なユースケースにのみ適しています。読み取りスループットが高く、単一のプロセスでは処理できない場合は、 DataSourceStreamReader使用してください。

Databricks Runtime 15.3で追加されました

構文

Python
from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}

def read(self, start):
...

def readBetweenOffsets(self, start, end):
...

方法

手法

説明

initialOffset()

ストリーミングデータソースの初期オフセットを返します。新しいストリーミングクエリは、このオフセットから読み取りを開始します。

read(start)

開始オフセットから利用可能なすべてのデータを読み取り、レコードのイテレータと次の読み取り試行のための終了オフセットのタプルを返します。

readBetweenOffsets(start, end)

指定された開始オフセットと終了オフセットの間にある利用可能なすべてのデータを読み取ります。障害復旧時に呼び出され、バッチを決定論的に再読み込みします。

commit(end)

Sparkがend以下のオフセットのすべてのデータの処理を完了したことをソースに通知します。

カスタムの簡易ストリーミングデータソースリーダーを定義します。

Python
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"

def schema(self):
return "value STRING"

def simpleStreamReader(self, schema):
return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}

def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end

def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()

def commit(self, end):
pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()
このページの見出し