メインコンテンツまでスキップ

データソースストリームライター

データ ストリーム ライターの基本クラス。

データ ストリーム ライターは、ストリーミング シンクにデータを書き込む役割を担います。このクラスを実装し、 DataSource.streamWriter()からインスタンスを返し、データ ソースをストリーミング シンクとして書き込み可能にします。 write()は各マイクロバッチのエグゼキューターで呼び出され、マイクロバッチ内のすべてのタスクが完了した後にドライバーでcommit()またはabort()が呼び出されます。

構文

Python
from pyspark.sql.datasource import DataSourceStreamWriter

class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...

方法

手法

説明

write(iterator)

ストリーミング シンクにデータを書き込みます。マイクロバッチごとに 1 回、エグゼキューター上で呼び出されます。 Rowオブジェクトの反復子を受け入れてWriterCommitMessageを返します。コミット メッセージがない場合はNoneを返します。このメソッドは抽象的であり、実装する必要があります。

commit(messages, batchId)

すべてのエグゼキューターから収集したコミットメッセージのリストを使用してマイクロバッチをコミットします。 マイクロバッチ内のすべてのタスクが正常に実行されたときにドライバーで呼び出されます。

abort(messages, batchId)

すべてのエグゼキューターから収集したコミットメッセージのリストを使用してマイクロバッチを中止します。 マイクロバッチ内の 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。

注意

  • ドライバーはすべてのエグゼキューターからコミット メッセージを収集し、すべてのタスクが成功した場合はcommit()に渡し、いずれかのタスクが失敗した場合はabort()に渡します。
  • 書き込みタスクが失敗した場合、そのコミット メッセージはcommit()またはabort()に渡されるリスト内のNoneになります。
  • batchId 各マイクロバッチを一意に識別し、処理されるマイクロバッチごとに 1 ずつ増加します。

ファイルに行を追加するストリーム ライターを実装します。

Python
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int

class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")

def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))

def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")

def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")