データソースライター
データソースライターの基本クラス。
データ ソース ライターは、データをデータ ソースに保存する責任があります。 このクラスを実装し、 DataSource.writer()からインスタンスを返し、データ ソースを書き込み可能にします。
構文
Python
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
方法
手法 | 説明 |
|---|---|
データをデータソースに書き込みます。 各エグゼキューターで 1 回呼び出されます。 | |
すべてのエグゼキューターから収集したコミットメッセージのリストを使用して書き込みジョブをコミットします。 すべてのタスクが正常に実行されたときにドライバー上で呼び出されます。 | |
すべてのエグゼキューターから収集したコミットメッセージのリストを使用して書き込みジョブを中止します。 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。 |
注意
- ドライバーはすべてのエグゼキューターからコミット メッセージを収集し、すべてのタスクが成功した場合は
commit()に渡し、いずれかのタスクが失敗した場合はabort()に渡します。 - 書き込みタスクが失敗した場合、そのコミット メッセージは
commit()またはabort()に渡されるリスト内のNoneになります。
例
行をファイルに保存する基本的なライターを実装します。
Python
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")