DataSourceStreamArrowWriter
Uma classe base para gravadores de transmissão de dados que processam uso de dados do PyArrow RecordBatch.
Ao contrário de DataSourceStreamWriter, que funciona com um iterador de objetos Spark Row , esta classe é otimizada para o formato Arrow ao escrever dados de transmissão. Ele pode oferecer melhor desempenho ao interagir com sistemas ou bibliotecas que oferecem suporte nativo ao Arrow para casos de uso de transmissão. Implemente esta classe e retorne uma instância de DataSource.streamWriter() para tornar uma fonte de dados gravável como um coletor de transmissão usando Arrow.
Sintaxe
from pyspark.sql.datasource import DataSourceStreamArrowWriter
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...
Métodos
Método | Descrição |
|---|---|
| Escreve um iterador de objetos PyArrow |
| Confirme o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando todas as tarefas na execução do microbatch forem concluídas com sucesso. Herdado de |
| Aborta o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas no microbatch falham. Herdado de |
Notas
- O driver coleta mensagens commit de todos os executores e as passa para
commit()se todas as tarefas forem bem-sucedidas, ou paraabort()se alguma tarefa falhar. - Se uma tarefa de escrita falhar, sua mensagem de commit será
Nonena lista passada paracommit()ouabort(). batchIdIdentifica exclusivamente cada microlote e incrementa em 1 a cada microlote processado.
Exemplos
Implemente um gravador de transmissão baseado em Arrow que conte as linhas por microlote:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")