データソースストリーム矢印ライター
PyArrow のRecordBatchを使用してデータを処理するデータ ストリーム ライターの基本クラス。
Spark Rowオブジェクトの反復子で動作するDataSourceStreamWriterとは異なり、このクラスはストリーミング データを書き込むときに Arrow 形式に最適化されています。ストリーミングユースケースで Arrow をネイティブにサポートするシステムやライブラリとインターフェイスする場合、より優れたパフォーマンスを提供できます。このクラスを実装し、 DataSource.streamWriter()からインスタンスを返し、Arrow を使用してデータ ソースをストリーミング シンクとして書き込み可能にします。
構文
from pyspark.sql.datasource import DataSourceStreamArrowWriter
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...
方法
手法 | 説明 |
|---|---|
| PyArrow |
| すべてのエグゼキューターから収集したコミットメッセージのリストを使用してマイクロバッチをコミットします。 マイクロバッチ内のすべてのタスクが正常に実行されたときにドライバーで呼び出されます。 |
| すべてのエグゼキューターから収集したコミットメッセージのリストを使用してマイクロバッチを中止します。 マイクロバッチ内の 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。 |
注意
- ドライバーはすべてのエグゼキューターからコミット メッセージを収集し、すべてのタスクが成功した場合は
commit()に渡し、いずれかのタスクが失敗した場合はabort()に渡します。 - 書き込みタスクが失敗した場合、そのコミット メッセージは
commit()またはabort()に渡されるリスト内のNoneになります。 batchId各マイクロバッチを一意に識別し、処理されるマイクロバッチごとに 1 ずつ増加します。
例
マイクロバッチごとに行数をカウントする Arrow ベースのストリーム ライターを実装します。
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")