メインコンテンツまでスキップ

foreachBatch を使用して任意のデータ シンクに書き込む

この記事では、 foreachBatch と構造化ストリーミングを使用して、既存のストリーミング シンクがないデータソースにストリーミング クエリの出力を書き込む方法について説明します。

streamingDF.writeStream.foreachBatch(...)コードパターンを使用すると、ストリーミングクエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。foreachBatch で使用される関数は、次の 2 つのパラメーターを取ります。

  • マイクロバッチの出力データを持つ データフレーム。
  • マイクロバッチの一意の ID。

foreachBatch は、構造化ストリーミングでの Delta Lake マージ操作に使用する必要があります。 foreachBatchを使用したストリーミングクエリからのアップサートを参照してください。

追加の データフレーム 操作を適用する

多くのデータフレーム およびデータセット操作は、 が増分プランの生成をサポートしていないため、ストリーミングデータフレーム Sparkではサポートされていません。foreachBatch()を使用すると、これらの操作の一部を各マイクロバッチ出力に適用できます。たとえば、 foreachBatch() 操作と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードの Delta テーブルに書き込むことができます。 詳細については、 MERGE INTO を参照してください。

重要
  • foreachBatch() 少なくとも 1 回のみの書き込み保証を提供します。 ただし、関数に提供される batchId を使用して、出力の重複を排除し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンドツーエンドのセマンティクスについて自分で推論する必要があります。
  • foreachBatch() 連続処理モードでは、ストリーミング クエリのマイクロバッチ実行に基本的に依存するため、連続処理モードでは機能しません。連続モードでデータを書き込む場合は、代わりに foreach() を使用してください。
  • ステートフルな演算子で foreachBatch を使用する場合は、処理が完了する前に各バッチを完全に消費することが重要です。「各バッチ DataFrame を完全に使用する」を参照してください

空のデータフレームは foreachBatch() で呼び出すことができ、ユーザー コードは適切な操作を可能にするために回復力を備えている必要があります。 次に例を示します。

Scala
  .foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()

Databricks Runtime 14.0 の foreachBatch の動作の変更

Databricks Runtime 14.0 以降では、標準アクセス モードで設定されたコンピュートで、次の動作変更が適用されます。

  • print() コマンドは、ドライバー ログに出力を書き込みます。
  • 関数内の dbutils.widgets サブモジュールにはアクセスできません。
  • 関数で参照されるファイル、モジュール、またはオブジェクトは、シリアル化可能であり、Spark で使用できる必要があります。

既存のバッチデータソースの再利用

foreachBatch()を使用すると、構造化ストリーミングをサポートしていない可能性のあるデータ シンクに対して、既存のバッチ データ ライターを使用できます。次に例をいくつか示します。

他の多くのバッチデータソースは、 foreachBatch()から使用できます。「データソースと外部サービスへの接続」を参照してください。

複数の場所に書き込む

ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks では、最適な並列化とスループットのために複数の構造化ストリーミング ライターを使用することをお勧めします。

foreachBatch を使用して複数のシンクに書き込むと、ストリーミング書き込みの実行がシリアル化されるため、各マイクロバッチの待機時間が長くなる可能性があります。

foreachBatch を使用して複数の Delta テーブルに書き込む場合は、foreachBatchでのべき等テーブルへの書き込みに関するページを参照してください。

各バッチを完全に消費 DataFrame

ステートフルな演算子を使用している場合 (たとえば、 dropDuplicatesWithinWatermarkを使用している場合)、各バッチ反復で DataFrame 全体を使用するか、クエリを再開する必要があります。DataFrame 全体を使用しない場合、ストリーミング クエリは次のバッチで失敗します。

これはいくつかのケースで発生する可能性があります。次の例は、DataFrame を正しく消費しないクエリを修正する方法を示しています。

バッチのサブセットを意図的に使用する

バッチのサブセットのみに関心がある場合は、次のようなコードを使用できます。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
batch_df.show(2)

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

この場合、 batch_df.show(2) はバッチの最初の 2 つの品目のみを処理しますが、これは想定されますが、それ以上の品目がある場合は、それらを消費する必要があります。次のコードは、完全な DataFrame を使用します。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
pass

def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

ここでは、 do_nothing 関数は DataFrame の残りの部分をサイレントに無視します。

バッチ内のエラーの処理

foreachBatchでのエラー処理については、 Databricks 、ストリーミング クエリが高速で失敗することを許可し、代わりにLakeFlowジョブやApache Airflowなどのオーケストレーション レイヤーに依存して再試行ロジックを管理することをお勧めします。 これは、データ損失が発生する可能性のある複雑な再試行ループをコード内に構築するよりもはるかに安全です。

以下は、あなたの執筆目標に基づいたガイドラインです。

ターゲット

ガイダンス

DataFrame操作

Delta Lakeテーブル

再試行時の冪等性を保証し、データの正確性を保護するためには、 txnAppIdtxnVersion書き込みオプションを使用し、 txnVersionbatchIdにバインドする必要があります。例外をローカルで捕捉して再試行しないでください。代わりに、 Databricks 、 Sparkメトリクスの正確性を維持し、データが重複せず、オーケストレーターが完全なバッチを正常に再試行できるように、エラーの伝播を許可することをお勧めします。

カスタムコードと外部宛先

.collect()OLTPデータベース、メッセージキュー、 APIs

独自の冪等性を実装してください。あらゆる操作は、バッチ間で再試行される可能性がある、あるいは実際に再試行されるものと想定する必要があります。batchId値が変化しない場合、演算結果も変化しないはずです。一時的な接続タイムアウトなどの純粋なエラーについては再試行しても構いませんが、再試行が最終的に失敗した場合に部分的な書き込みや重複書き込みが発生しないよう、細心の注意を払ってください。最も安全な方法は、エラーが伝播するのを許容し、オーケストレーターがバッチ全体を再試行できるようにすることです。

foreachBatchでは、例外の種類とその処理方法に関する推奨事項をいくつか示します。

例外タイプ

推奨される行動

一時的なシンクエラー

SQLTransientConnectionExceptionHTTP 429、タイムアウト

キャッチ :再試行するか、デッドレターキューに送信する

シンクが冪等である場合の重複またはキー制約違反

SQLIntegrityConstraintViolationException

キャッチ :ログ記録と抑制

カスタム再試行可能なエラー

ラップされたソケット例外、再試行可能なデータベースエラー

キャッチ :メトリクスをインクリメントし、制御された継続を可能にする

論理エラーまたはスキーマエラー

NullPointerExceptionAttributeError 、スキーマ不一致

伝播 : Sparkがクエリを失敗させる

再試行不可能なシンクエラーまたは捕捉されていないロジックバグ

ValueError, PermissionError

伝播 : Sparkがクエリを失敗させる

重大な失敗

OutOfMemoryError、破損した状態、データ完全性違反

伝播 : Sparkがクエリを失敗させる

コード例:例外処理

以下の例では、エラーを処理するさまざまな方法を示すために、意図的にforeachでエラーを発生させています。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()

上記のコードはエラーを処理して黙って抑制するため、バッチの残りの処理を実行しない可能性があります。この状況に対処するには、2つの選択肢があります。

まず、エラーを再発生させることで、エラーをオーケストレーション層に渡してバッチ処理を再試行させることができます。一時的な問題であれば、これでエラーを解決できます。そうでなければ、運用チームが手動で修正を試みるよう、問題を報告してください。これを行うには、 partial_funcコードを次のように変更します。

Python
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue

第二に、例外をキャッチしてバッチの残りの部分を無視したい場合は、コードを変更してdo_nothing関数を使用してバッチの残りの部分を黙って無視することができます。

Python
from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
pass

def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()