ForEachBatch を使用してパイプライン内の任意のデータシンクに書き込む
プレビュー
foreach_batch_sink API はパブリック プレビュー段階です。
ForEachBatch シンクを使用すると、ストリームを一連のマイクロバッチとして処理できます。各バッチはApache Spark構造化ストリーミングのforeachBatchと同様のカスタム ロジックを使用してPythonで処理できます。 LakeFlow Spark宣言型パイプライン (SDP) ForEachBatch シンクを使用すると、ストリーミング書き込みをネイティブにサポートしていない 1 つ以上のターゲットにストリーミング データを変換、マージ、または書き込むことができます。 このページでは、ForEachBatch シンクの設定手順を説明し、例を示し、重要な考慮事項について説明します。
ForEachBatch シンクは次の機能を提供します。
- 各マイクロバッチのカスタム ロジック : ForEachBatch は柔軟なストリーミング シンクです。Python コードを使用して、任意のアクション (外部テーブルへのマージ、複数の宛先への書き込み、upsert の実行など) を適用できます。
- 完全な更新のサポート : パイプラインはフローごとにチェックポイントを管理するため、パイプラインの完全な更新を実行するとチェックポイントが自動的にリセットされます。 ForEachBatch シンクでは、このような状況が発生したときに下流のデータのリセットを管理する責任があります。
- Unity Catalogサポート : ForEachBatch シンクは、 Unity Catalogボリュームまたはテーブルからの読み取りや書き込みなど、すべてのUnity Catalog機能をサポートします。
- 制限されたハウスキーピング : パイプラインは ForEachBatch シンクから書き込まれたデータを追跡しないため、そのデータをクリーンアップできません。ダウンストリームのデータ管理についてはお客様が責任を負います。
- イベント ログ エントリ : パイプライン イベント ログには、各 ForEachBatch シンクの作成と使用状況が記録されます。Python 関数がシリアル化できない場合は、イベント ログに警告エントリと追加の提案が表示されます。
- ForEachBatch シンクは、
append_flowなどのストリーミング クエリ用に設計されています。これはバッチ専用パイプラインまたはAutoCDCセマンティクス向けではありません。 - このページで説明する ForEachBatch シンクはパイプライン用です。Apache Spark構造化ストリーミングも
foreachBatchをサポートします。 構造化ストリーミングforeachBatchについては、 「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
ForEachBatchシンクを使用する場合
パイプラインで、 deltaやkafkaなどの組み込みシンク形式では利用できない機能が必要な場合は、常に ForEachBatch シンクを使用します。一般的な使用例は次のとおりです:
- Delta Lake テーブルへのマージまたはアップサート : マイクロバッチごとにカスタム マージ ロジックを実行します (更新されたレコードの処理など)。
- 複数のまたはサポートされていない宛先への書き込み : 各バッチの出力を、ストリーミング書き込みをサポートしていない複数のテーブルまたは外部ストレージ システム (特定の JDBC シンクなど) に書き込みます。
- カスタム ロジックまたは変換の適用 : Python でデータを直接操作します (たとえば、専用のライブラリや高度な変換を使用)。
組み込みシンク、またはPythonを使用したカスタム シンクの作成に関する情報については、 LakeFlow Spark宣言型パイプラインのシンク」を参照してください。
構文
@dp.foreach_batch_sink()デコレーションを使用して ForEachBatch シンクを生成します。次に、フロー定義内でこれをtargetとして参照できます (例: @dp.append_flow )。
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
パラメーター | 説明 |
|---|---|
name | オプション。パイプライン内のシンクを識別するための一意の名前。含まれていない場合は、デフォルトで UDF の名前になります。 |
バッチハンドラ | これは、各マイクロバッチに対して呼び出されるユーザー定義関数 (UDF) です。 |
DF | 現在のマイクロバッチのデータを含む Spark DataFrame。 |
バッチID | マイクロバッチの整数 ID。Spark はトリガー間隔ごとにこの ID を増分します。
|
フルリフレッシュ
ForEachBatch はストリーミング クエリを使用するため、パイプラインは各フローのチェックポイント ディレクトリを追跡します。 完全更新 時:
- チェックポイント ディレクトリがリセットされます。
- シンク関数 (
foreach_batch_sinkUDF) は、0 から始まるまったく新しいbatch_idサイクルを認識します。 - ターゲット システム内のデータは、パイプラインによって自動的にクリーンアップされ ません (パイプラインはデータが書き込まれる場所を認識していないため)。白紙の状態からのシナリオが必要な場合は、ForEachBatch シンクが設定する外部テーブルまたは場所を手動で削除または切り捨てる必要があります。
Unity Catalog機能の使用
Spark構造化ストリーミングforeach_batch_sinkの既存のUnity Catalog機能はすべて引き続き利用できます。
これには、管理対象または外部のUnity Catalogテーブルへの書き込みが含まれます。 Apache Spark構造化ストリーミング ジョブとまったく同じように、マイクロバッチをUnity Catalogマネージドまたは外部テーブルに書き込むことができます。
イベントログエントリ
ForEachBatch シンクを作成すると、 "format": "foreachBatch"を含むSinkDefinitionイベントがパイプラインのイベント ログに追加されます。
これにより、ForEachBatch シンクの使用状況を追跡し、シンクに関する警告を確認できます。
Databricks Connect と併用する
指定した関数が シリアル化可能でない 場合 (Databricks Connect の重要な要件)、イベント ログには、Databricks Connect のサポートが必要な場合にコードを簡素化またはリファクタリングすることを推奨するWARNエントリが含まれます。
たとえば、 dbutilsを使用して ForEachBatch UDF内で問題を取得する場合、
代わりに、 UDFで使用する前に引数を取得することもできます。
# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
value = dbutils.widgets.get ("X") + str (i)
# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")
def foreach_batch(df, batchId):
value = argX + str (i)
ベストプラクティス
- ForEachBatch 関数を簡潔に保ちます 。スレッド化、ライブラリへの大きな依存、またはメモリ内の大きなデータ操作を避けます。複雑なロジックやステートフルなロジックは、シリアル化エラーやパフォーマンスのボトルネックを引き起こす可能性があります。
- チェックポイント フォルダーを監視します 。ストリーミング クエリの場合、SDP はシンクではなくフローごとにチェックポイントを管理します。パイプラインに複数のフローが存在する場合、各フローには独自のチェックポイント ディレクトリがあります。
- 外部依存関係を検証する : 外部システムまたはライブラリに依存している場合は、それらがすべてのクラスター ノードまたはコンテナーにインストールされていることを確認します。
- Databricks Connect に注意してください : 将来的に環境を Databricks Connect に移行する可能性がある場合は、コードがシリアル化可能であり、
foreach_batch_sinkUDF 内のdbutilsに依存していないことを確認してください。
制限事項
- ForEachBatch にはハウスキーピングがありません 。カスタム Python コードは任意の場所にデータを書き込む可能性があるため、パイプラインはそのデータをクリーンアップしたり追跡したりできません。書き込み先については、独自のデータ管理または保持ポリシーを処理する必要があります。
- マイクロバッチのメトリクス: パイプラインはストリーミング メトリクスを収集します が、シナリオによっては、ForEachBatch を使用すると不完全または異常なメトリクスが発生する可能性があります。 これは、ForEachBatch の根本的な柔軟性により、システムにとってデータ フローと行の追跡が困難になるためです。
- 複数の読み取りを行わずに複数の宛先への書き込みをサポート : 一部の顧客は、ForEachBatch を使用してソースから 1 回読み取り、複数の宛先に書き込む場合があります。 これを実現するには、ForEachBatch 関数内に
df.persistまたはdf.cacheを含める必要があります。これらのオプションを使用すると、Databricks はデータの準備を 1 回だけ試行します。これらのオプションがないと、クエリによって複数の読み取りが行われます。 これは次のコード例には含まれていません。 - Databricks Connect での使用 : パイプラインを Databricks Connect で実行する場合、
foreachBatchユーザー定義関数 (UDF) はシリアル化可能である必要があり、dbutils使用できません。パイプラインは、シリアル化できない UDF を検出すると警告を発しますが、パイプラインを失敗させることはありません。 - シリアル化できないロジック : ローカル オブジェクト、クラス、または pickle 化できないリソースを参照するコードは、Databricks Connect コンテキストで動作しなくなる可能性があります。Databricks Connect が必須である場合は、純粋な Python モジュールを使用し、参照 (たとえば、
dbutils) が使用されていないことを確認します。
例
基本的な構文の例
from pyspark import pipelines as dp
# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
return
# Create source data for example:
@dp.table()
def example_source_data():
return spark.range(5)
# Add sink to an append flow:
@dp.append_flow(
target="my_foreachbatch_sink",
)
def my_flow():
return spark.readStream.format("delta").table("example_source_data")
シンプルなパイプラインのサンプルデータの使用
この例では、NYC タクシーのサンプルを使用します。ワークスペース管理者が Databricks パブリック データセット カタログを有効にしていることを前提としています。シンクの場合は、 my_catalog.my_schemaアクセスできるカタログとスキーマに変更します。
from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp
# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
# For this example, we are adding a timestamp column.
enriched = df.withColumn("processed_timestamp", current_timestamp())
# Write to a Delta location
enriched.write \
.format("delta") \
.mode("append") \
.saveAsTable("my_catalog.my_schema.trips_sink_delta")
# Return is optional here, but generally not used for the sink
return
# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
target="my_foreach_sink",
)
def taxi_source():
df = spark.readStream.table("samples.nyctaxi.trips")
return df
複数の宛先への書き込み
この例では、複数の宛先に書き込みます。これは、 txnVersionとtxnAppIdを使用して Delta Lake テーブルへの書き込みをべき等にする方法を示しています。詳細については、 foreachBatchのべき等テーブルの書き込みを参照してください。
2 つのテーブルtable_aとtable_bに書き込んでいるとします。バッチ内で、 table_aへの書き込みは成功しますが、 table_bへの書き込みは失敗します。バッチが再実行されると、( txnVersion 、 txnAppId ) ペアにより、Delta はtable_aへの重複書き込みを無視し、バッチをtable_bにのみ書き込みます。
from pyspark import pipelines as dp
app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId
# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
# Optionally do transformations, logging, or merging logic
# ...
# Write to a Delta table
df.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.example_table_1")
# Also write to a JSON file location
df.write \
.format("json") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.save("/tmp/json_target")
return
# Create source data for example
@dp.table()
def example_source():
return spark.range(5)
# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return spark.readStream.format("delta").table("example_source")
使用 spark.sql()
次の例のように、ForEachBatch シンクでspark.sql()使用できます。
from pyspark import pipelines as dp
from pyspark.sql import Row
@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
df.createOrReplaceTempView("df_view")
df.sparkSession.sql("MERGE INTO target_table AS tgt " +
"USING df_view AS src ON tgt.id = src.id " +
"WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
"WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
)
return
# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")
# Create source table
@dp.table()
def src_table():
return spark.range(5)
@dp.append_flow(
target="example_sink",
)
def example_flow():
return spark.readStream.format("delta").table("source_table")
よくある質問(FAQ)
ForEachBatch シンクでdbutils使用できますか?
パイプラインを Databricks Connect 以外の環境で実行する予定の場合は、 dbutils機能する可能性があります。ただし、Databricks Connect を使用する場合、 dbutils foreachBatch関数内からアクセスできません。パイプラインは、 dbutils使用を検出すると、破損を回避するために警告を出すことがあります。
単一の ForEachBatch シンクで複数のフローを使用できますか?
はい。すべて同じシンク名をターゲットとする複数のフロー ( @dp.append_flowを使用) を定義できますが、各フローは独自のチェックポイントを維持します。
パイプラインはターゲットのデータ保持またはクリーンアップを処理しますか?
いいえ。ForEachBatch シンクは任意の場所またはシステムに書き込むことができるため、パイプラインはそのターゲット内のデータを自動的に管理または削除することはできません。これらの操作は、カスタム コードまたは外部プロセスの一部として処理する必要があります。
ForEachBatch 関数のシリアル化エラーまたは失敗をトラブルシューティングするにはどうすればよいですか?
クラスター ドライバー ログまたはパイプライン イベント ログを確認します。Spark Connect 関連のシリアル化の問題については、関数がシリアル化可能な Python オブジェクトのみに依存しており、許可されていないオブジェクト (開いているファイル ハンドルやdbutilsなど) を参照していないことを確認してください。