Pular para o conteúdo principal

Gravador de fonte de dados

Uma classe base para escritores de fontes de dados.

os escritores de fonte de dados são responsáveis por salvar os dados em uma fonte de dados. Implemente esta classe e retorne uma instância de DataSource.writer() para tornar uma fonte de dados gravável.

Sintaxe

Python
from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...

Métodos

Método

Descrição

write(iterator)

Grava dados na fonte de dados. Chamado uma vez em cada executor. Aceita um iterador de Row objetos e retorna um WriterCommitMessage, ou None se não houver mensagem de commit. Este método é abstrato e precisa ser implementado.

commit(messages)

Confirme a tarefa de escrita usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando todas as tarefas forem executadas com sucesso.

abort(messages)

Aborta a tarefa de escrita usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas falharam.

Notas

  • O driver coleta mensagens commit de todos os executores e as passa para commit() se todas as tarefas forem bem-sucedidas, ou para abort() se alguma tarefa falhar.
  • Se uma tarefa de escrita falhar, sua mensagem de commit será None na lista passada para commit() ou abort().

Exemplos

Implemente um gravador básico que salve as linhas em um arquivo:

Python
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int

class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")

def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))

def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")

def abort(self, messages):
print("Write job failed, performing cleanup")