メインコンテンツまでスキップ

foreachBatch を使用して任意のデータ シンクに書き込む

この記事では、 foreachBatch と構造化ストリーミングを使用して、既存のストリーミング シンクがないデータソースにストリーミング クエリの出力を書き込む方法について説明します。

streamingDF.writeStream.foreachBatch(...)コードパターンを使用すると、ストリーミングクエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。foreachBatch で使用される関数は、次の 2 つのパラメーターを取ります。

  • マイクロバッチの出力データを持つ データフレーム。
  • マイクロバッチの一意の ID。

foreachBatch は、構造化ストリーミングでの Delta Lake マージ操作に使用する必要があります。 foreachBatchを使用したストリーミングクエリからのアップサートを参照してください。

追加の データフレーム 操作を適用する

多くのデータフレーム およびデータセット操作は、 が増分プランの生成をサポートしていないため、ストリーミングデータフレーム Sparkではサポートされていません。foreachBatch()を使用すると、これらの操作の一部を各マイクロバッチ出力に適用できます。たとえば、 foreachBatch() 操作と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードの Delta テーブルに書き込むことができます。 詳細については、 MERGE INTO を参照してください。

important
  • foreachBatch() 少なくとも 1 回のみの書き込み保証を提供します。 ただし、関数に提供される batchId を使用して、出力の重複を排除し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンドツーエンドのセマンティクスについて自分で推論する必要があります。
  • foreachBatch() 連続処理モードでは、ストリーミング クエリのマイクロバッチ実行に基本的に依存するため、連続処理モードでは機能しません。連続モードでデータを書き込む場合は、代わりに foreach() を使用してください。
  • ステートフルな演算子で foreachBatch を使用する場合は、処理が完了する前に各バッチを完全に消費することが重要です。「各バッチ DataFrame を完全に使用する」を参照してください

空のデータフレームは foreachBatch() で呼び出すことができ、ユーザー コードは適切な操作を可能にするために回復力を備えている必要があります。 次に例を示します。

Scala
  .foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()

Databricks Runtime 14.0 の foreachBatch の動作の変更

Databricks Runtime 14.0 以降では、標準アクセス モードで設定されたコンピュートで、次の動作変更が適用されます。

  • print() コマンドは、ドライバー ログに出力を書き込みます。
  • 関数内の dbutils.widgets サブモジュールにはアクセスできません。
  • 関数で参照されるファイル、モジュール、またはオブジェクトは、シリアル化可能であり、Spark で使用できる必要があります。

既存のバッチデータソースの再利用

foreachBatch()を使用すると、構造化ストリーミングをサポートしていない可能性のあるデータ シンクに対して、既存のバッチ データ ライターを使用できます。次に例をいくつか示します。

他の多くのバッチデータソースは、 foreachBatch()から使用できます。「データソースと外部サービスへの接続」を参照してください。

複数の場所に書き込む

ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks では、最適な並列化とスループットのために複数の構造化ストリーミング ライターを使用することをお勧めします。

foreachBatch を使用して複数のシンクに書き込むと、ストリーミング書き込みの実行がシリアル化されるため、各マイクロバッチの待機時間が長くなる可能性があります。

foreachBatch を使用して複数の Delta テーブルに書き込む場合は、foreachBatchでのべき等テーブルへの書き込みに関するページを参照してください。

各バッチを完全に消費 DataFrame

ステートフルな演算子を使用している場合 (たとえば、 dropDuplicatesWithinWatermarkを使用している場合)、各バッチ反復で DataFrame 全体を使用するか、クエリを再開する必要があります。DataFrame 全体を使用しない場合、ストリーミング クエリは次のバッチで失敗します。

これはいくつかのケースで発生する可能性があります。次の例は、DataFrame を正しく消費しないクエリを修正する方法を示しています。

バッチのサブセットを意図的に使用する

バッチのサブセットのみに関心がある場合は、次のようなコードを使用できます。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
batch_df.show(2)

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

この場合、 batch_df.show(2) はバッチの最初の 2 つの品目のみを処理しますが、これは想定されますが、それ以上の品目がある場合は、それらを消費する必要があります。次のコードは、完全な DataFrame を使用します。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
pass

def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

ここでは、 do_nothing 関数は DataFrame の残りの部分をサイレントに無視します。

バッチ内のエラーの処理

foreachBatchプロセスの実行中にエラーが発生する可能性があります。次のようなコードを使用できます (この場合、サンプルは問題を示すために意図的にエラーを発生させます)。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

エラーを処理 (およびサイレントに飲み込む) により、バッチの残りの部分が消費されない場合があります。この状況に対処するには、2 つのオプションがあります。

まず、エラーを再発生し、オーケストレーション レイヤーに渡してバッチを再試行します。これにより、一時的な問題の場合はエラーが解決されるか、運用チームが手動で修正を試みるためにエラーが発生する可能性があります。これを行うには、 partial_func コードを次のように変更します。

Python
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue

2 番目のオプションは、例外をキャッチし、バッチの残りの部分を無視する場合、コードを次のように変更することです。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
pass

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

このコードでは、 do_nothing 関数を使用して、バッチの残りの部分をサイレントに無視します。