メインコンテンツまでスキップ

データソース矢印ライター

PyArrow のRecordBatchを使用してデータを処理するデータ ソース ライターの基本クラス。

Spark Rowオブジェクトの反復子で動作するDataSourceWriterとは異なり、このクラスはデータの書き込み時に Arrow 形式に最適化されています。Arrow をネイティブにサポートするシステムやライブラリとのインターフェイス時に、より優れたパフォーマンスを提供できます。このクラスを実装し、 DataSource.writer()からインスタンスを返し、Arrow を使用してデータ ソースを書き込み可能にします。

構文

Python
from pyspark.sql.datasource import DataSourceArrowWriter

class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
...

方法

手法

説明

write(iterator)

PyArrow RecordBatchオブジェクトの反復子をシンクに書き込みます。各エグゼキューターで 1 回呼び出されます。 WriterCommitMessageを返します。コミット メッセージがない場合はNoneを返します。このメソッドは抽象的であり、実装する必要があります。

commit(messages)

すべてのエグゼキューターから収集したコミットメッセージのリストを使用して書き込みジョブをコミットします。 すべてのタスクが正常に実行されたときにドライバー上で呼び出されます。DataSourceWriterから継承されました。

abort(messages)

すべてのエグゼキューターから収集したコミットメッセージのリストを使用して書き込みジョブを中止します。 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。DataSourceWriterから継承されました。

注意

  • ドライバーはすべてのエグゼキューターからコミット メッセージを収集し、すべてのタスクが成功した場合はcommit()に渡し、いずれかのタスクが失敗した場合はabort()に渡します。
  • 書き込みタスクが失敗した場合、そのコミット メッセージはcommit()またはabort()に渡されるリスト内のNoneになります。

すべてのバッチにわたって行をカウントする Arrow ベースのライターを実装します。

Python
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int

class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)

def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")

def abort(self, messages):
print("Write job failed, performing cleanup")