foreach (DataStreamWriter)
Define a saída da consulta de transmissão a ser processada usando o gravador fornecido. A lógica de processamento pode ser especificada como uma função que recebe uma linha como entrada, ou como um objeto com process(row) e métodos opcionais open(partition_id, epoch_id) e close(error) .
Sintaxe
foreach(f)
Parâmetros
Parâmetro | Tipo | Descrição |
|---|---|---|
| invocável ou objeto | Uma função que recebe uma linha como entrada, ou um objeto com um método |
Devoluções
DataStreamWriter
Notas
O objeto fornecido deve ser serializável. Qualquer inicialização para escrita de dados (por exemplo, abrir uma conexão) deve ser feita dentro de open(), não no momento da construção.
Exemplos
import time
df = spark.readStream.format("rate").load()
Processar cada linha usando uma função:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Processe cada linha usando um objeto com os métodos open, process e close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()