Pular para o conteúdo principal

DataSourceStreamWriter

Uma classe base para escritores de transmissão de dados.

Os gravadores de transmissão de dados são responsáveis por gravar dados em um coletor de transmissão. Implemente esta classe e retorne uma instância de DataSource.streamWriter() para tornar uma fonte de dados gravável como um coletor de transmissão. write() é chamado no executor para cada microbatch e commit() ou abort() é chamado no driver após todas as tarefas no microbatch serem concluídas.

Sintaxe

Python
from pyspark.sql.datasource import DataSourceStreamWriter

class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...

Métodos

Método

Descrição

write(iterator)

Grava dados no coletor de transmissão. O executor é chamado uma vez por microlote. Aceita um iterador de Row objetos e retorna um WriterCommitMessage, ou None se não houver mensagem de commit. Este método é abstrato e precisa ser implementado.

commit(messages, batchId)

Confirme o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando todas as tarefas na execução do microbatch forem concluídas com sucesso.

abort(messages, batchId)

Aborta o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas no microbatch falham.

Notas

  • O driver coleta mensagens commit de todos os executores e as passa para commit() se todas as tarefas forem bem-sucedidas, ou para abort() se alguma tarefa falhar.
  • Se uma tarefa de escrita falhar, sua mensagem de commit será None na lista passada para commit() ou abort().
  • batchId Identifica exclusivamente cada microlote e incrementa em 1 a cada microlote processado.

Exemplos

Implemente um gravador de transmissão que anexe linhas a um arquivo:

Python
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int

class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")

def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))

def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")

def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")