Pular para o conteúdo principal

DataSourceStreamArrowWriter

Uma classe base para gravadores de transmissão de dados que processam uso de dados do PyArrow RecordBatch.

Ao contrário de DataSourceStreamWriter, que funciona com um iterador de objetos Spark Row , esta classe é otimizada para o formato Arrow ao escrever dados de transmissão. Ele pode oferecer melhor desempenho ao interagir com sistemas ou bibliotecas que oferecem suporte nativo ao Arrow para casos de uso de transmissão. Implemente esta classe e retorne uma instância de DataSource.streamWriter() para tornar uma fonte de dados gravável como um coletor de transmissão usando Arrow.

Sintaxe

Python
from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...

Métodos

Método

Descrição

write(iterator)

Escreve um iterador de objetos PyArrow RecordBatch no coletor de transmissão. O executor é chamado uma vez por microlote. Retorna WriterCommitMessage ou None se não houver mensagem de commit. Este método é abstrato e precisa ser implementado.

commit(messages, batchId)

Confirme o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando todas as tarefas na execução do microbatch forem concluídas com sucesso. Herdado de DataSourceStreamWriter.

abort(messages, batchId)

Aborta o microbatch usando uma lista de mensagens commit coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas no microbatch falham. Herdado de DataSourceStreamWriter.

Notas

  • O driver coleta mensagens commit de todos os executores e as passa para commit() se todas as tarefas forem bem-sucedidas, ou para abort() se alguma tarefa falhar.
  • Se uma tarefa de escrita falhar, sua mensagem de commit será None na lista passada para commit() ou abort().
  • batchId Identifica exclusivamente cada microlote e incrementa em 1 a cada microlote processado.

Exemplos

Implemente um gravador de transmissão baseado em Arrow que conte as linhas por microlote:

Python
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)

def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")

def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")