foreachBatch を使用して任意のデータ シンクに書き込む
この記事では、構造化ストリーミングで foreachBatch
を使用して、既存のストリーミング シンクがないデータソースにストリーミング クエリーの出力を書き込む方法について説明します。
コードパターン streamingDF.writeStream.foreachBatch(...)
を使用すると、ストリーミングクエリーのすべてのマイクロバッチの出力データにバッチ関数を適用できます。 foreachBatch
で使用される関数は、次の 2 つのパラメーターを取ります。
マイクロバッチの出力データを持つ DataFrame 。
マイクロバッチの一意の ID。
構造化ストリーミングでの Delta Lake マージ操作には foreachBatch
を使用する必要があります。 foreachBatch を使用したストリーミングクエリーからのアップサートを参照してください。
追加の DataFrame 操作を適用する
ストリーミング DataFrame sでは、Spark が増分プランの生成をサポートしていないため、多くのDataFrame およびデータセット操作はサポートされていません。 foreachBatch()
を使用すると、これらの操作の一部を各マイクロバッチ出力に適用できます。たとえば、 foreachBath()
と SQL MERGE INTO
操作を使用して、ストリーミング集計の出力を更新モードで Delta テーブルに書き込むことができます。 詳細については、「 マージ先」を参照してください。
重要
foreachBatch()
少なくとも一度だけ書き込み保証を提供します。 ただし、関数に提供されるbatchId
を使用して、出力を重複排除し、一度だけ保証することができます。 どちらの場合も、エンドツーエンドのセマンティクスについて自分で推論する必要があります。foreachBatch()
連続処理モードでは 、基本的にストリーミングクエリーのマイクロバッチ実行に依存しているため、動作しません。連続モードでデータを書き込む場合は、代わりにforeach()
を使用します。
空のデータフレームは foreachBatch()
で呼び出すことができ、適切な操作を可能にするためにユーザーコードは回復力が必要です。 次に例を示します。
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 での foreachBatch
の動作の変更
共有アクセス モードで構成されたコンピュート上のDatabricks Runtime 14.0 以降では、次の動作の変更が適用されます。
print()
コマンドは出力をドライバー ログに書き込みます。関数内の
dbutils.widgets
サブモジュールにはアクセスできません。関数内で参照されるファイル、モジュール、またはオブジェクトはシリアル化可能であり、Spark で使用できる必要があります。
既存のバッチデータソースの再利用
foreachBatch()
を使用すると、構造化ストリーミングをサポートしていない可能性があるデータ シンクに既存のバッチ データ ライターを使用できます。次に例をいくつか示します。
他の多くのバッチデータソースは、 foreachBatch()
から使用できます。 「データソースへの接続」を参照してください。
複数の場所に書き込む
ストリーミング クエリーの出力を複数の場所に書き込む必要がある場合、Databricks では、最適な並列化とスループットを得るために、複数の構造化ストリーミング ライターを使用することをお勧めします。
foreachBatch
を使用して複数のシンクに書き込むと、ストリーミング書き込みの実行がシリアル化されるため、各マイクロバッチの待機時間が長くなる可能性があります。
foreachBatch
を使用して複数の Delta テーブルに書き込む場合は、「foreachBatch でのべき等テーブルの書き込み」を参照してください。