メインコンテンツまでスキップ

foreach (DataStreamWriter)

指定されたライターを使用して処理されるストリーミングクエリの出力を設定します。処理ロジックは、行を入力として受け取る関数として指定することも、 process(row)とオプションのopen(partition_id, epoch_id)およびclose(error)メソッドを持つオブジェクトとして指定することもできます。

構文

foreach(f)

パラメーター

パラメーター

Type

説明

f

呼び出し可能オブジェクトまたはオブジェクト

入力として行を受け取る関数、またはprocess(row)メソッドとオプションのopenおよびcloseメソッドを持つオブジェクトを受け取る関数。

戻り値

DataStreamWriter

注意

提供されるオブジェクトはシリアル化可能である必要があります。データ書き込みのための初期化(例えば、接続を開くなど)は、構築時ではなく、 open()内部で行う必要があります。

Python
import time
df = spark.readStream.format("rate").load()

各行を関数を使って処理します。

Python
def print_row(row):
print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

openprocesscloseメソッドを持つオブジェクトを使用して各行を処理します。

Python
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True

def process(self, row):
print(row)

def close(self, error):
print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()