メインコンテンツまでスキップ

foreachBatch を使用して任意のデータ シンクに書き込む

この記事では、 foreachBatch と構造化ストリーミングを使用して、既存のストリーミング シンクがないデータソースにストリーミング クエリの出力を書き込む方法について説明します。

streamingDF.writeStream.foreachBatch(...)コードパターンを使用すると、ストリーミングクエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。foreachBatch で使用される関数は、次の 2 つのパラメーターを取ります。

  • マイクロバッチの出力データを持つ DataFrame。
  • マイクロバッチの一意の ID。

foreachBatch は、構造化ストリーミングでの Delta Lake マージ操作に使用する必要があります。 foreachBatchを使用したストリーミングクエリからのアップサートを参照してください。

追加の DataFrame 操作を適用する

多くのDataFrame およびデータセット操作は、 が増分プランの生成をサポートしていないため、ストリーミングDataFrames Sparkではサポートされていません。foreachBatch()を使用すると、これらの操作の一部を各マイクロバッチ出力に適用できます。たとえば、 foreachBatch() 操作と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードの Delta テーブルに書き込むことができます。 詳細については、 MERGE INTO を参照してください。

important
  • foreachBatch() 少なくとも 1 回のみの書き込み保証を提供します。 ただし、関数に提供される batchId を使用して、出力の重複を排除し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンドツーエンドのセマンティクスについて自分で推論する必要があります。
  • foreachBatch() 連続処理モードでは、ストリーミング クエリのマイクロバッチ実行に基本的に依存するため、連続処理モードでは機能しません。連続モードでデータを書き込む場合は、代わりに foreach() を使用してください。

空のデータフレームは foreachBatch() で呼び出すことができ、ユーザー コードは適切な操作を可能にするために回復力を備えている必要があります。 次に例を示します。

Scala
  .foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()

Databricks Runtime 14.0 の foreachBatch の動作の変更

Databricks Runtime 14.0 以降では、標準アクセス モードで設定されたコンピュートで、次の動作変更が適用されます。

  • print() コマンドは、ドライバー ログに出力を書き込みます。
  • 関数内の dbutils.widgets サブモジュールにはアクセスできません。
  • 関数で参照されるファイル、モジュール、またはオブジェクトは、シリアル化可能であり、Spark で使用できる必要があります。

既存のバッチデータソースの再利用

foreachBatch()を使用すると、構造化ストリーミングをサポートしていない可能性のあるデータ シンクに対して、既存のバッチ データ ライターを使用できます。次に例をいくつか示します。

他の多くのバッチデータソースは、 foreachBatch()から使用できます。 「データソースへの接続」を参照してください。

複数の場所に書き込む

ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks では、最適な並列化とスループットのために複数の構造化ストリーミング ライターを使用することをお勧めします。

foreachBatch を使用して複数のシンクに書き込むと、ストリーミング書き込みの実行がシリアル化されるため、各マイクロバッチの待機時間が長くなる可能性があります。

foreachBatch を使用して複数の Delta テーブルに書き込む場合は、foreachBatchでのべき等テーブルへの書き込みに関するページを参照してください。