DataSourceStreamWriter
Uma classe base para escritores de transmissão de dados.
Os gravadores de transmissão de dados são responsáveis por gravar dados em um coletor de transmissão. Implemente esta classe e retorne uma instância de DataSource.streamWriter() para tornar uma fonte de dados gravável como um coletor de transmissão. write() é chamado no executor para cada microbatch e commit() ou abort() é chamado no driver após todas as tarefas no microbatch serem concluídas.
Sintaxe
from pyspark.sql.datasource import DataSourceStreamWriter
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...
Métodos
Método | Descrição |
|---|---|
| Grava dados no coletor de transmissão. O executor é chamado uma vez por microlote. Aceita um iterador de |
| Confirme o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando todas as tarefas na execução do microbatch forem concluídas com sucesso. |
| Aborta o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas no microbatch falham. |
Notas
- O driver coleta mensagens commit de todos os executores e as passa para
commit()se todas as tarefas forem bem-sucedidas, ou paraabort()se alguma tarefa falhar. - Se uma tarefa de escrita falhar, sua mensagem de commit será
Nonena lista passada paracommit()ouabort(). batchIdIdentifica exclusivamente cada microlote e incrementa em 1 a cada microlote processado.
Exemplos
Implemente um gravador de transmissão que anexe linhas a um arquivo:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")