Pular para o conteúdo principal

foreach (DataStreamWriter)

Define a saída da consulta de transmissão a ser processada usando o gravador fornecido. A lógica de processamento pode ser especificada como uma função que recebe uma linha como entrada, ou como um objeto com process(row) e métodos opcionais open(partition_id, epoch_id) e close(error) .

Sintaxe

foreach(f)

Parâmetros

Parâmetro

Tipo

Descrição

f

invocável ou objeto

Uma função que recebe uma linha como entrada, ou um objeto com um método process(row) e métodos opcionais open e close .

Devoluções

DataStreamWriter

Notas

O objeto fornecido deve ser serializável. Qualquer inicialização para escrita de dados (por exemplo, abrir uma conexão) deve ser feita dentro de open(), não no momento da construção.

Exemplos

Python
import time
df = spark.readStream.format("rate").load()

Processar cada linha usando uma função:

Python
def print_row(row):
print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Processe cada linha usando um objeto com os métodos open, process e close :

Python
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True

def process(self, row):
print(row)

def close(self, error):
print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()