メインコンテンツまでスキップ

foreach_batch_sink

@dp.foreach_batch_sink()デコレータは、カスタムロジックを使用してPythonで処理する一連のマイクロバッチとしてストリームを処理するForEachBatchシンクを定義します。変換されたデータを書き込むために、追加フローでシンクを target として参照します。概念的なガイダンス、考慮事項、および例については、「ForEachBatch を使用してパイプライン内の任意のデータシンクに書き込む」を参照してください。

構文

Python
from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.

パラメーター

パラメーター

説明

name

オプション。パイプライン内でシンクを識別するための一意の名前です。含まれていない場合は、UDF の名前がデフォルトとなります。

バッチハンドラー

これは、各マイクロバッチに対して呼び出されるユーザー定義関数(UDF)です。

df

現在のマイクロバッチのデータを含むSpark DataFrame

バッチ ID

マイクロバッチの整数IDです。Sparkは、各トリガー間隔でこのIDをインクリメントします。

batch_id0 は、ストリームの開始、または完全更新の開始を表します。foreach_batch_sink コードは、ダウンストリームのデータソースのフルリフレッシュを適切に処理する必要があります。詳細については、「フル更新」をご覧ください。

このページの見出し