Delta table ストリーミング 読み取りと書き込み
このページでは、 Deltaテーブルから変更をストリーミングする方法について説明します。 Delta Lake 、 readStreamおよびwriteStreamを通じてSpark構造化ストリーミングと深く統合されています。 Delta Lake は、ストリーミング システムやファイルに通常伴う次のような多くの制限を克服します。
- 低遅延の取り込みによって生成された小さなファイルを結合します。
- 複数のストリーム (または並列 バッチジョブ) で "exactly-once" 処理を維持します。
- ストリームのソースとしてファイルを使用する場合に、新しいファイルを効率的に検出します。
このページでは、Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法について説明します。Databricks SQLでストリーミング テーブルを使用してデータを読み込む方法については、 Databricks SQLでストリーミング テーブルを使用する」を参照してください。
Delta Lakeを使用したストリーム静的結合に関する情報については、「ストリーム静的結合」を参照してください。
入力レートの制限
マイクロバッチの制御には、以下のオプションがあります:
maxFilesPerTrigger:各マイクロバッチで考慮される新しいファイルの数。デフォルトは1000です。maxBytesPerTrigger:各マイクロバッチで処理されるデータの量。このオプションでは「ソフトマックス」を設定します。つまり、バッチはおおよそこの量のデータを処理し、最小の入力単位がこの制限よりも大きい場合にストリーミングクエリを進めるために、制限を超えるデータを処理する可能性があります。これはデフォルトでは設定されていません。
maxFilesPerTriggerとmaxBytesPerTriggerを組み合わせて使用すると、マイクロバッチは maxFilesPerTriggerまたはmaxBytesPerTriggerの制限に達するまでデータを処理します。
logRetentionDuration設定によってソーステーブルのトランザクションがクリーンアップされ、ストリーミングクエリがそれらのバージョンを処理しようとすると、デフォルトでは、データ損失を避けるためにクエリは失敗します。オプション failOnDataLoss を false に設定すると、失われたデータを無視して処理を続行できます。
Deltaテーブルへの変更を処理する
構造化ストリーミングは Delta テーブルを増分的に読み取ります。ストリーミング クエリが Delta テーブルに対してアクティブな間、新しいテーブル バージョンがソース テーブルにコミットされると、新しいレコードはべき等的に処理されます。構造化ストリーミングは追加ではない入力を処理せず、ソースとして使用されているテーブルに変更が発生した場合は例外をスローします。たとえば、 UPDATE 、 DELETE 、 MERGE INTO 、またはOVERWRITE操作によって、ストリーミング クエリによって読み取られているソース Delta テーブルが変更されると、ストリームはエラーで失敗します。
ユースケースに応じて、ソース Delta テーブルへのアップストリームの変更を処理するための一般的なアプローチが 4 つあります。それぞれの参照表と詳細は以下に記載されています。
アプローチ | 長所 | 短所 |
|---|---|---|
| シンプルで、複雑なロジックを書く必要はありません。アップストリームの変更が個別に処理される追加のみの処理や、不良レコードを一時的に処理する場合に役立ちます。 | 変更は伝播せず、追加のみを処理します。 |
フルリフレッシュ | またシンプルで、複雑なロジックを書く必要もありません。アップストリームの変更がほとんどない小規模なデータセットに役立ちます。 | 大規模なデータセットではコストが高く、すべてのダウンストリーム テーブルの再処理が必要になります。 |
チェンジデータフィード | すべての変更の種類 (挿入、更新、削除) を処理します。Databricks では、可能な場合はテーブルから直接ストリーミングするのではなく、Delta テーブルの CDC フィードからストリーミングすることを推奨しています。 | 各変更タイプを処理するには、より複雑なロジックを記述する必要があります。 |
マテリアライズドビュー | 構造化ストリーミングのシンプルな代替手段。自動変更伝播を提供します。 | 待ち時間が長くなります。 LakeFlow Spark宣言型パイプラインとDatabricks SQLでのみ利用可能です。 |
アップストリームの変更コミットをスキップ skipChangeCommits
skipChangeCommitsを設定すると、ストリーミング エンジンは既存のレコードを削除または変更するトランザクションを無視し、追加のみを処理するようになります。これは、既存のデータへの変更をストリームを通じて伝播する必要がないことが分かっている場合、またはそれらの変更を処理するために別のロジックを必要とする場合に役立ちます。一時的に 1 回限りの変更を無視する必要がある場合は、 skipChangeCommits有効または無効にすることができます。
Databricks 、変更データフィードを使用しないほとんどのワークロードにskipChangeCommitsを使用することを推奨します。
- Python
- Scala
(spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
)
spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
Deltaテーブルのスキーマが、そのテーブルに対してストリーミング読み取りを開始した後に変更された場合、クエリは失敗します。ほとんどのスキーマの変更では、ストリームを再起動してスキーマの不一致を解決することで処理を続行することができます。
Databricks Runtime 12.2 LTS以前では、列の名前変更や削除などの非加算的なスキーマ進化が行われた列マッピングが有効になっているDeltaテーブルからストリームすることはできません。 詳細については、 「列マッピングとストリーミング」を参照してください。
Databricks Runtime 12.2 LTS 以降では、 skipChangeCommits は以前の設定 ignoreChangesを非推奨にします。 Databricks Runtime 11.3 LTS 以前では、 ignoreChanges のみがサポートされているオプションです。
ignoreChangesのセマンティクは、skipChangeCommitsと大きく異なります。ignoreChangesを有効にすると、UPDATE、MERGE INTO、DELETE(パーティション内)、またはOVERWRITEなどのデータ変更操作の後に、ソーステーブル内の書き換えられたデータファイルが再出力されます。変更されていない行は、しばしば新しい行と一緒に出力されるため、ダウンストリームのコンシューマーは重複を処理できる必要があります。削除はダウンストリームに反映されません。ignoreChangesはignoreDeletesを包含します。
skipChangeCommits ファイル変更操作を完全に無視します。UPDATE 、 MERGE INTO 、 DELETE 、 OVERWRITEなどのデータ変更操作によってソース テーブルに書き換えられたデータ ファイルは完全に無視されます。ストリーム ソース テーブルの変更を反映するには、これらの変更を伝播する別のロジックを実装する必要があります。
ignoreChanges で構成されたワークロードは、引き続き既知のセマンティクスを使用して動作しますが、Databricks では、すべての新しいワークロードに skipChangeCommits を使用することをお勧めします。ignoreChanges から skipChangeCommits へのワークロードの移行には、リファクタリング ロジックが必要です。
レガシーオプション: ignoreDeletes
ignoreDeletes パーティション境界でデータを削除するトランザクション (つまり、完全なパーティションの削除) のみを処理するレガシー オプションです。パーティション以外の削除、更新、またはその他の変更を処理する必要がある場合は、代わりにskipChangeCommits使用します。
- Python
- Scala
(spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
)
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
ダウンストリームテーブルの完全更新
アップストリームの変更がまれで、データが再処理できるほど小さい場合は、ストリーミング チェックポイントと出力テーブルを削除して、ストリームを最初から再開できます。これにより、ストリームはソース テーブルのすべてのデータを再処理します。このアプローチでは、このストリームの出力に依存するすべてのダウンストリーム テーブルの再処理も必要になることに注意してください。
このアプローチは、アップストリームの変更がまれで、完全な更新のコストが許容できる小規模なデータセットまたはワークロードに最適です。
チェンジデータフィードを使う
あらゆる種類の変更 (挿入、更新、削除) を処理するワークロードの場合は、 Delta Lake変更データフィードを使用します。 変更データフィードは行レベルの変更をDeltaテーブルに記録するため、それらの変更をストリーミングし、ダウンストリーム テーブルの各変更タイプを処理するロジックを作成できます。 これは、コードがあらゆる種類の変更イベントを明示的に処理するため、最も堅牢なアプローチです。DatabricksでのDelta Lake変更データフィードの使用」を参照してください。
LakeFlow Spark宣言型パイプラインを使用している場合は、 「APPLY CHANGES APIsを使用してDelta Live Tables での変更データ キャプチャを簡素化する」を参照してください。
Databricks Runtime 12.2 LTS以前では、列の名前変更や削除など、非加算的なスキーマ進化が行われた列マッピングが有効になっているDeltaテーブルの変更データフィードからストリームすることはできません。 列マッピングとストリーミングを参照してください。
マテリアライズドビューを使用する
マテリアライズドビューは、ソースデータが変更されたときに結果を再計算することで、上流の変更を自動的に処理します。 レイテンシーを最小限に抑える必要がなく、ストリーミングの複雑さの管理を避けたい場合は、マテリアライズドビューを使用してアーキテクチャを簡素化できます。 マテリアライズドビューは、 LakeFlow Spark宣言型パイプライン パイプラインおよびDatabricks SQLで利用できます。 See マテリアライズドビュー.
例
たとえば、dateによってパーティション化された、date、user_email、およびaction列を含むテーブルuser_eventsがあるとします。user_eventsテーブルからストリーミングしており、GDPRのためテーブルからデータを削除する必要があります。
skipChangeCommits 複数のパーティション内のデータを削除できます (この例では、 user_emailでフィルタリングします)。次の構文を使用します。
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
UPDATE ステートメントでuser_emailを更新すると、問題のuser_emailを含むファイルが書き換えられます。skipChangeCommits を使用して、変更されたデータ ファイルを無視します。
Databricks では、削除によって常にパーティションが完全に削除されることが確実でない限り、 ignoreDeletesではなくskipChangeCommits使用することをお勧めします。
初期位置の指定
次のオプションを使用すると、テーブル全体を処理せずに、Delta Lakeストリーミングソースの開始点を指定できます。
startingVersion: 開始する Delta Lake バージョン。Databricks では、ほとんどのワークロードでこのオプションを省略することを推奨しています。設定されていない場合、ストリームは、その時点でのテーブルの完全なスナップショットと、変更データとしての将来の変更を含む、利用可能な最新バージョンから開始されます。指定すると、ストリームは指定されたバージョン (含む) 以降の Delta テーブルへのすべての変更を読み取ります。指定されたバージョンが利用できなくなった場合、ストリームの開始は失敗します。DESCRIBE HISTORYコマンド出力のversion列からコミット バージョンを取得できます。最新の変更のみを返すには、latestを指定します。startingTimestamp: タイムスタンプを開始するポイントです。タイムスタンプ以降に行われたすべてのテーブル変更は、ストリーミングリーダーによって読み取られます。提供されたタイムスタンプがすべてのテーブルのコミットより前にある場合、ストリーミングの読み取りは利用可能な最も早いタイムスタンプから始まります。これは次のいずれかになります。- タイムスタンプ文字列。たとえば、
"2019-01-01T00:00:00.000Z"などです。 - 日付文字列。たとえば、
"2019-01-01"などです。
- タイムスタンプ文字列。たとえば、
両方のオプションを同時に設定することはできません。 これらは、新しいストリーミング クエリを開始するときにのみ有効になります。 ストリーミング クエリが開始され、進行状況がチェックポイントに記録されている場合、これらのオプションは無視されます。
指定したバージョン、またはタイムスタンプからストリーミングソースを開始できますが、ストリーミングソースのスキーマは常にDeltaテーブルの最新のスキーマになります。指定されたバージョンまたはタイムスタンプ以降に、Deltaテーブルに互換性のないスキーマ変更がないことを確認する必要があります。そうしないと、ストリーミングソースが不正なスキーマでデータを読み取った際に、間違った結果を返す可能性があります。
例
たとえば、user_eventsというテーブルがあるとします。バージョン5以降の変更を読み取りたい場合は、次を使用します:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
2018年10月18日以降の変更を読み取りたい場合は、次を使用します。
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
データを削除せずに初期スナップショットを処理
この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。
Delta テーブルをストリーム ソースとして使用する場合、クエリは最初にテーブルに存在するすべてのデータを処理します。このバージョンの Delta テーブルは、初期スナップショットと呼ばれます。デフォルトでは、 Delta テーブルのデータファイルは、最後に変更されたファイルに基づいて処理されます。 ただし、最終変更時刻は、必ずしもレコード イベント時刻の順序を表すわけではありません。
ウォーターマークが定義されたステートフルストリーミングクエリでは、ファイルを更新時間順に処理すると、レコードが誤った順序で処理される可能性があります。このため、ウォーターマークによって記録がレイトイベントとして取りこぼされる可能性があります。
次のオプションを有効にすることで、データドロップの問題を回避できます。
- withEventTimeOrder: 初期スナップショットをイベント時間順に処理するかどうか。
イベント時間順序を有効にすると、初期スナップショットデータのイベント時間の範囲がタイムバケットに分割されます。各マイクロバッチは、時間範囲内のデータをフィルタリングすることによってバケットを処理します。maxFilesPerTrigger および maxBytesPerTrigger 構成オプションは引き続きマイクロバッチサイズの制御に適用できますが、処理の性質上、近似的な方法でのみ適用されます。
以下の図は、このプロセスを示しています。

この機能に関する重要な情報:
-
データドロップの問題は、ステートフルストリーミングクエリーの最初の Delta スナップショットがデフォルトの順序で処理される場合にのみ発生します。
-
最初のスナップショットの処理中にストリームクエリーが開始されると、それ以降は
withEventTimeOrderを変更することはできません。withEventTimeOrderを変更して再起動するには、チェックポイントを削除する必要があります。 -
withEventTimeOrder を有効にしてストリーム クエリを実行している場合、初期スナップショット処理が完了するまで、この機能をサポートしていない Databricks Runtime バージョンにダウングレードすることはできません。ダウングレードする必要がある場合は、初期スナップショットが完了するまで待つか、チェックポイントを削除してクエリを再開することができます。
-
この機能は、以下のような稀なケースではサポートされていません。
- イベント時間列はジェネレーテッドカラムであり、Delta ソースとウォーターマークの間に非投影変換がある場合。
- ストリームクエリーに複数の Delta ソースを持つウォーターマークがある場合。
-
イベント時間順を有効にすると、Delta の初期スナップショット処理のパフォーマンスが低下する可能性があります。
-
各マイクロバッチは初期スナップショットをスキャンして、対応するイベント時間範囲内のデータをフィルタリングします。フィルター アクションを高速化するには、データ スキップを適用できるように、イベント時間として Delta ソース列を使用することをお勧めします (適用可能な場合は、データ スキップを確認してください)。さらに、イベント時間列に沿ってテーブルをパーティション分割すると、処理がさらに高速化されます。特定のマイクロバッチでスキャンされたデルタファイルの数を確認するには、Spark UI をチェックしてください。
例
event_time 列を持つ user_events というテーブルがあるとします。ストリーミングクエリーは集計クエリーです。初期スナップショット処理中にデータドロップが起きないようにするには、以下を使用します。
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
すべてのストリーミングクエリーに適用されるクラスター上の Spark 構成を使用してこの機能を有効にすることもできます。 spark.databricks.delta.withEventTimeOrder.enabled true
Delta テーブルをシンクとして
構造化ストリーミングを使用して Delta テーブルにデータを書き込むこともできます。テーブルに対して他のストリームやバッチクエリーが同時に実行されている場合でも、トランザクションログにより、Delta Lake で 1 回のみの処理が保証されます。
構造化ストリーミング シンクを使用して Delta テーブルに書き込む場合、 epochId = -1で空のコミットを確認できます。これらは予期され、通常発生します。
- ストリーミング クエリの各実行の最初のバッチで (これは、
Trigger.AvailableNowのすべてのバッチで発生します)。 - スキーマが変更されたとき (列の追加など)。
これらの空のコミットは、クエリの正確性やパフォーマンスに重大な影響を与えません。これらは意図的なものであり、エラーを示すものではありません。
Delta Lake VACUUM 関数は、Delta Lake によって管理されていないすべてのファイルを削除しますが、_で始まるディレクトリはスキップします。<table-name>/_checkpointsなどのディレクトリ構造を使用すると、Delta テーブルの他のデータやメタデータと一緒にチェックポイントを安全に保存できます。
メトリクス
ストリーミングクエリプロセスでまだ処理されていないバイト数とファイル数は、numBytesOutstanding および numFilesOutstanding メトリクスとして確認できます。その他のメトリクスには、次のものがあります。
numNewListedFiles: このバッチのバックログを計算するためにリスト化された Delta Lake ファイルの数。backlogEndOffset: バックログの計算に使用されるテーブルバージョン。
ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
追加モード
デフォルトでは、ストリームは追加モードで実行され、新しいレコードがテーブルに追加されます。
テーブルにストリーミングする場合は、次の例のように toTable メソッドを使用します。
- Python
- Scala
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
コンプリートモード
構造化ストリーミングを使用して、テーブル全体をバッチごとに置き換えることもできます。ユースケースの一例として、集計を用いたサマリーの計算があります。
- Python
- Scala
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
前述の例では、顧客ごとのイベントの総数を含むテーブルが継続的に更新されます。
レイテンシ要件がより緩やかなアプリケーションの場合は、1 回限りのトリガーでコンピューティングリソースを節約できます。これらを使用することで、特定のスケジュールでサマリー集計テーブルを更新し、最後の更新時以降に到着した新しいデータのみを処理することができます。
を使用してストリーミング クエリからアップサートします foreachBatch
merge と foreachBatch を組み合わせて使用すると、ストリーミング クエリから Delta テーブルに複雑なアップサートを書き込むことができます。「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
このパターンには、以下のように様々な適用方法があります。
- 更新モードでストリーミング集計を書き込む :この方法は、コンプリートモードよりもはるかに効率的です。
- データベース変更のストリームを Delta テーブルに書き込む : 変更データを書き込むためのマージ クエリ を
foreachBatchで使用して、変更のストリームを Delta テーブルに継続的に適用できます。 - 重複除去を使用して Delta テーブルにデータのストリームを書き込む : 重複除去のための挿入専用マージ クエリ を
foreachBatchで使用して、自動重複除去を使用して Delta テーブルに (重複を含む) データを継続的に書き込むことができます。
- ストリーミングクエリーを再起動すると、データの同じバッチに操作が複数回適用される可能性があるため、
foreachBatch内のmergeステートメントがべき等であることを確認してください。 mergeがforeachBatchで使用されている場合、ストリーミングクエリーの入力データレート (StreamingQueryProgressを通じてレポートされ、ノートブックのレートグラフに表示されます) は、ソースでデータが生成される実際のレートの倍数としてレポートされる場合があります。これは、mergeが入力データを複数回読み込むため、入力メトリクスが乗算されることが理由です。これがボトルネックになる場合は、mergeの前にバッチの データフレーム をキャッシュし、mergeの後にキャッシュを解除することができます。
次の例は、 foreachBatch 内で SQL を使用してこのタスクを実行する方法を示しています。
- Scala
- Python
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
次の例のように、Delta Lake API を使用してストリーミングの Upsert を実行することも選択できます。
- Scala
- Python
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
べき等テーブルは foreachBatch
Databricks では、 foreachBatchを使用する代わりに、更新するシンクごとに個別のストリーミング書き込みを構成することをお勧めします。これは、'foreachBatch' を使用すると複数のテーブルへの書き込みがシリアル化されるため、並列化が減少し、全体的な待機時間が増加するためです。
Delta テーブルでは、foreachBatchべき等内の複数のテーブルに書き込むために、次の DataFrameWriter オプションがサポートされています。
txnAppId: 各 データフレーム 書き込みで渡すことができる一意の文字列。 たとえば、StreamingQuery ID をtxnAppIdとして使用できます。txnVersion: トランザクションのバージョンとして機能する単調増加の数値。
Delta Lake は、 txnAppId と txnVersion の組み合わせを使用して、重複する書き込みを特定し、それらを無視します。
バッチ書き込みが失敗で中断された場合、バッチを再実行すると、同じアプリケーションとバッチ ID を使用して、ランタイムが重複する書き込みを正しく識別し、それらを無視するのに役立ちます。 アプリケーション ID(txnAppId)は、ユーザーが生成した任意の一意の文字列にすることができ、ストリーム ID に関連付ける必要はありません。 「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
ストリーミング・チェックポイントを削除し、新しいチェックポイントでクエリを再開する場合は、別の txnAppIdを指定する必要があります。 新しいチェックポイントは、バッチ ID 0で開始されます。 Delta Lake は、バッチ ID とtxnAppIdを一意のキーとして使用し、既に表示されている値を持つバッチをスキップします。
次のコード例は、このパターンを示しています。
- Python
- Scala
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}