Delta table ストリーミング 読み取りと書き込み
このページでは、 Deltaテーブルから変更をストリーミングする方法について説明します。 Delta Lake 、 readStreamおよびwriteStreamを通じてSpark構造化ストリーミングと深く統合されています。 Delta Lake は、ストリーミング システムやファイルに通常伴う次のような多くの制限を克服します。
- 低遅延の取り込みによって生成された小さなファイルを結合します。
 - 複数のストリーム (または並列 バッチジョブ) で "exactly-once" 処理を維持します。
 - ストリームのソースとしてファイルを使用する場合に、新しいファイルを効率的に検出します。
 
この記事では、Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法について説明します。Databricks SQLでストリーミングテーブルを使用してデータをロードする方法については、「Databricks SQLでストリーミングテーブルを使用する」を参照してください。
Delta Lakeを使用したストリーム静的結合に関する情報については、「ストリーム静的結合」を参照してください。
ストリームの変更
増分処理のために Delta テーブルから変更をストリーミングする場合、考慮すべきオプションが 2 つあります。
- Deltaテーブルの変更データ キャプチャ ( CDC ) フィードからのストリーム。
 - Delta テーブル自体からのストリーム。
 
オプション 1 はより堅牢なソリューションであり、挿入、更新、削除などのさまざまな種類の変更イベントを処理する方法をコードで定義します。オプション 2 は、変更イベントを処理するためのコードを記述する必要がないため、より簡単です。ただし、オプション 2 は、ソース Delta テーブルが追加専用の場合にのみ推奨されます。Deltaテーブルに変更 (更新や削除など) があると、構造化ストリーミング エンジンは例外をスローします。 この例外を処理するには、ソース テーブルのすべてのデータを再処理するか、ソース テーブルの変更を無視するように構成します。詳細については、 「更新と削除を無視する」を参照してください。
Databricks では、可能な限り Delta テーブル自体 (オプション 2) ではなく、Delta テーブルの CDC フィード (オプション 1) からのストリーミングを推奨しています。
オプション 1: データ チェンジ キャプチャ ( CDC ) フィードからのストリーム
Delta Lake変更データフィードは、更新や削除などのDeltaテーブルへの変更を記録します。 有効にすると、変更データフィードからストリーミングし、ダウンストリーム テーブルへの挿入、更新、削除を処理するロジックを作成できます。 変更データフィードのデータ出力は、記述されているDeltaテーブルとは若干異なりますが、それでも増分変更をアメダリオンアーキテクチャの下流テーブルに伝播することができます。
Databricks Runtime 12.2 LTS以前では、列の名前変更や削除など、非加算的なスキーマ進化が行われた列マッピングが有効になっているDeltaテーブルの変更データフィードからストリームすることはできません。 列マッピングとスキーマ変更によるストリーミングを参照してください。
オプション2: Deltaテーブルからのストリーム
構造化ストリーミングは、Delta テーブルを増分的に読み取ります。 ストリーミング クエリが Delta テーブルに対してアクティブである間、新しいレコードは、新しいテーブル バージョンがソース テーブルにコミットされるとべき等に処理されます。
次のコード例は、テーブル名またはファイルパスを使用したストリーミング読み取りの設定を示しています。
- Python
 - Scala
 
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Deltaテーブルのスキーマが、そのテーブルに対してストリーミング読み取りを開始した後に変更された場合、クエリは失敗します。ほとんどのスキーマの変更では、ストリームを再起動してスキーマの不一致を解決することで処理を続行することができます。
Databricks Runtime 12.2 LTS 以下では、列マッピングが有効になっている Delta テーブルからストリームを送信することはできません。このテーブルは、名前の変更や列の削除など、非加法的なスキーマの進化を経ています。詳細については、「 列マッピングとスキーマの変更によるストリーミング」を参照してください。
入力レートの制限
マイクロバッチの制御には、以下のオプションがあります:
maxFilesPerTrigger:各マイクロバッチで考慮される新しいファイルの数。デフォルトは1000です。maxBytesPerTrigger:各マイクロバッチで処理されるデータの量。このオプションでは「ソフトマックス」を設定します。つまり、バッチはおおよそこの量のデータを処理し、最小の入力単位がこの制限よりも大きい場合にストリーミングクエリを進めるために、制限を超えるデータを処理する可能性があります。これはデフォルトでは設定されていません。
maxFilesPerTriggerとmaxBytesPerTriggerを組み合わせて使用すると、マイクロバッチは maxFilesPerTriggerまたはmaxBytesPerTriggerの制限に達するまでデータを処理します。
logRetentionDuration設定によってソーステーブルのトランザクションがクリーンアップされ、ストリーミングクエリがそれらのバージョンを処理しようとすると、デフォルトでは、データ損失を避けるためにクエリは失敗します。オプション failOnDataLoss を false に設定すると、失われたデータを無視して処理を続行できます。
更新と削除を無視する
Delta テーブルからストリーミングする場合、構造化ストリーミングは追加ではない入力を処理せず、ソースとして使用されているテーブルに変更が発生した場合は例外をスローします。下流に自動的に伝播できない変更に対処するための主な戦略は 2 つあります。
- 出力とチェックポイントを削除して、ストリームを最初から再開する。
 - 次のいずれかのオプションを設定できます。
skipChangeCommits(推奨): 既存のレコードを削除または変更するトランザクションを無視します。このオプションはignoreDeletesを包含します。ignoreDeletes(レガシー): パーティション境界でデータを削除するトランザクションを無視します。このオプションは完全なパーティションの削除のみを処理します。
 
Databricks ではskipChangeCommits使用を推奨しています。
Databricks Runtime 12.2 LTS 以降では、 skipChangeCommits は以前の設定 ignoreChangesを非推奨にします。 Databricks Runtime 11.3 LTS 以前では、 ignoreChanges のみがサポートされているオプションです。
ignoreChangesのセマンティクは、skipChangeCommitsと大きく異なります。ignoreChangesを有効にすると、UPDATE、MERGE INTO、DELETE(パーティション内)、またはOVERWRITEなどのデータ変更操作の後に、ソーステーブル内の書き換えられたデータファイルが再出力されます。変更されていない行は、しばしば新しい行と一緒に出力されるため、ダウンストリームのコンシューマーは重複を処理できる必要があります。削除はダウンストリームに反映されません。ignoreChangesはignoreDeletesを包含します。
skipChangeCommits はファイルの変更操作を完全に無視します。UPDATE、MERGE INTO、DELETE、OVERWRITEなどのデータ変更操作によってソーステーブル内で書き換えられたデータファイルは完全に無視されます。アップストリームのソーステーブルに変更を反映させるには、これらの変更を伝搬するためのロジックを別途実装する必要があります。
ignoreChanges で構成されたワークロードは、引き続き既知のセマンティクスを使用して動作しますが、Databricks では、すべての新しいワークロードに skipChangeCommits を使用することをお勧めします。ignoreChanges から skipChangeCommits へのワークロードの移行には、リファクタリング ロジックが必要です。
例
たとえば、dateによってパーティション化された、date、user_email、およびaction列を含むテーブルuser_eventsがあるとします。user_eventsテーブルからストリーミングしており、GDPRのためテーブルからデータを削除する必要があります。
パーティション境界で削除する場合 (つまり、 WHERE がパーティション列にある場合)、ファイルは既に値でセグメント化されているため、削除によってメタデータからそれらのファイルがドロップされるだけです。 データのパーティション全体を削除する場合は、次のものを使用できます。
spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")
複数のパーティションのデータを削除する場合(この例では user_email でフィルタリングしています)、次の構文を使用してください。
spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")
UPDATE ステートメントでuser_emailを更新すると、問題のuser_emailを含むファイルが書き換えられます。skipChangeCommits を使用して、変更されたデータ ファイルを無視します。
Databricks では、削除によって常にパーティションが完全に削除されることが確実でない限り、 ignoreDeletesではなくskipChangeCommits使用することをお勧めします。
初期位置の指定
次のオプションを使用すると、テーブル全体を処理せずに、Delta Lakeストリーミングソースの開始点を指定できます。
- 
startingVersion: 開始する Delta Lake バージョン。Databricks では、ほとんどのワークロードでこのオプションを省略することを推奨しています。設定されていない場合、ストリームは、その時点でのテーブルの完全なスナップショットと、変更データとしての将来の変更を含む、利用可能な最新バージョンから開始されます。指定した場合、ストリームは、指定したバージョン (両端を含む) から始まる Delta テーブルに対するすべての変更を読み取ります。 指定したバージョンが使用できなくなった場合、ストリームは開始に失敗します。 コミット・バージョンは、DESCRIBE HISTORY コマンド出力の
version列から取得できます。最新の変更のみを返すには、
latestを指定します。 - 
startingTimestamp: タイムスタンプを開始するポイントです。タイムスタンプ以降に行われたすべてのテーブル変更は、ストリーミングリーダーによって読み取られます。提供されたタイムスタンプがすべてのテーブルのコミットより前にある場合、ストリーミングの読み取りは利用可能な最も早いタイムスタンプから始まります。これは次のいずれかになります。- タイムスタンプ文字列。たとえば、
"2019-01-01T00:00:00.000Z"などです。 - 日付文字列。たとえば、
"2019-01-01"などです。 
 - タイムスタンプ文字列。たとえば、
 
両方のオプションを同時に設定することはできません。 これらは、新しいストリーミング クエリを開始するときにのみ有効になります。 ストリーミング クエリが開始され、進行状況がチェックポイントに記録されている場合、これらのオプションは無視されます。
指定したバージョン、またはタイムスタンプからストリーミングソースを開始できますが、ストリーミングソースのスキーマは常にDeltaテーブルの最新のスキーマになります。指定されたバージョンまたはタイムスタンプ以降に、Deltaテーブルに互換性のないスキーマ変更がないことを確認する必要があります。そうしないと、ストリーミングソースが不正なスキーマでデータを読み取った際に、間違った結果を返す可能性があります。
例
たとえば、user_eventsというテーブルがあるとします。バージョン5以降の変更を読み取りたい場合は、次を使用します:
spark.readStream
  .option("startingVersion", "5")
  .table("user_events")
2018年10月18日以降の変更を読み取りたい場合は、次を使用します。
spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")
データを削除せずに初期スナップショットを処理
この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。
Delta テーブルをストリーム ソースとして使用する場合、クエリは最初にテーブルに存在するすべてのデータを処理します。このバージョンの Delta テーブルは、初期スナップショットと呼ばれます。デフォルトでは、 Delta テーブルのデータファイルは、最後に変更されたファイルに基づいて処理されます。 ただし、最終変更時刻は、必ずしもレコード イベント時刻の順序を表すわけではありません。
ウォーターマークが定義されたステートフルストリーミングクエリでは、ファイルを更新時間順に処理すると、レコードが誤った順序で処理される可能性があります。このため、ウォーターマークによって記録がレイトイベントとして取りこぼされる可能性があります。
次のオプションを有効にすることで、データドロップの問題を回避できます。
- withEventTimeOrder: 初期スナップショットをイベント時間順に処理するかどうか。
 
イベント時間順序を有効にすると、初期スナップショットデータのイベント時間の範囲がタイムバケットに分割されます。各マイクロバッチは、時間範囲内のデータをフィルタリングすることによってバケットを処理します。maxFilesPerTrigger および maxBytesPerTrigger 構成オプションは引き続きマイクロバッチサイズの制御に適用できますが、処理の性質上、近似的な方法でのみ適用されます。
以下の図は、このプロセスを示しています。

この機能に関する重要な情報:
- 
データドロップの問題は、ステートフルストリーミングクエリーの最初の Delta スナップショットがデフォルトの順序で処理される場合にのみ発生します。
 - 
最初のスナップショットの処理中にストリームクエリーが開始されると、それ以降は
withEventTimeOrderを変更することはできません。withEventTimeOrderを変更して再起動するには、チェックポイントを削除する必要があります。 - 
withEventTimeOrder を有効にしてストリームクエリを実行している場合、初期スナップショット処理が完了するまで、この機能をサポートしていない DBR バージョンにダウングレードすることはできません。ダウングレードする必要がある場合は、最初のスナップショットが終了するのを待つか、チェックポイントを削除してクエリを再開できます。
 - 
この機能は、以下のような稀なケースではサポートされていません。
- イベント時間列はジェネレーテッドカラムであり、Delta ソースとウォーターマークの間に非投影変換がある場合。
 - ストリームクエリーに複数の Delta ソースを持つウォーターマークがある場合。
 
 - 
イベント時間順を有効にすると、Delta の初期スナップショット処理のパフォーマンスが低下する可能性があります。
 - 
各マイクロバッチは、初期スナップショットをスキャンして、対応するイベント時間範囲内のデータをフィルタリングします。フィルター操作を高速化するには、データのスキップを適用できるように、イベント時間として Delta ソース列を使用することをお勧めします (該当する場合は、 Delta Lake のデータのスキップ を確認してください)。さらに、イベント時間列に沿ってテーブルをパーティション分割すると、処理をさらに高速化できます。Spark UI を確認して、特定のマイクロバッチでスキャンされたデルタファイルの数を確認できます。
 
例
event_time 列を持つ user_events というテーブルがあるとします。ストリーミングクエリーは集計クエリーです。初期スナップショット処理中にデータドロップが起きないようにするには、以下を使用します。
spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")
すべてのストリーミングクエリーに適用されるクラスター上の Spark 構成を使用してこの機能を有効にすることもできます。 spark.databricks.delta.withEventTimeOrder.enabled true
Delta テーブルをシンクとして
構造化ストリーミングを使用して Delta テーブルにデータを書き込むこともできます。テーブルに対して他のストリームやバッチクエリーが同時に実行されている場合でも、トランザクションログにより、Delta Lake で 1 回のみの処理が保証されます。
構造化ストリーミング シンクを使用して Delta テーブルに書き込む場合、 epochId = -1で空のコミットを確認できます。これらは予期され、通常発生します。
- ストリーミング クエリの各実行の最初のバッチで (これは、 
Trigger.AvailableNowのすべてのバッチで発生します)。 - スキーマが変更されたとき (列の追加など)。
 
これらの空のコミットは、クエリの正確性やパフォーマンスに重大な影響を与えません。これらは意図的なものであり、エラーを示すものではありません。
Delta Lake VACUUM 関数は、Delta Lake によって管理されていないすべてのファイルを削除しますが、_で始まるディレクトリはスキップします。<table-name>/_checkpointsなどのディレクトリ構造を使用すると、Delta テーブルの他のデータやメタデータと一緒にチェックポイントを安全に保存できます。
メトリクス
ストリーミングクエリプロセスでまだ処理されていないバイト数とファイル数は、numBytesOutstanding および numFilesOutstanding メトリクスとして確認できます。その他のメトリクスには、次のものがあります。
numNewListedFiles: このバッチのバックログを計算するためにリスト化された Delta Lake ファイルの数。backlogEndOffset: バックログの計算に使用されるテーブルバージョン。
ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。
{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}
追加モード
デフォルトでは、ストリームは追加モードで実行され、新しいレコードがテーブルに追加されます。
テーブルにストリーミングする場合は、次の例のように toTable メソッドを使用します。
- Python
 - Scala
 
(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")
コンプリートモード
構造化ストリーミングを使用して、テーブル全体をバッチごとに置き換えることもできます。ユースケースの一例として、集計を用いたサマリーの計算があります。
- Python
 - Scala
 
(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)
spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
前述の例では、顧客ごとのイベントの総数を含むテーブルが継続的に更新されます。
レイテンシ要件がより緩やかなアプリケーションの場合は、1 回限りのトリガーでコンピューティングリソースを節約できます。これらを使用することで、特定のスケジュールでサマリー集計テーブルを更新し、最後の更新時以降に到着した新しいデータのみを処理することができます。
を使用してストリーミング クエリからアップサートします foreachBatch
merge と foreachBatch を組み合わせて使用すると、ストリーミング クエリから Delta テーブルに複雑なアップサートを書き込むことができます。「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
このパターンには、以下のように様々な適用方法があります。
- 更新モードでストリーミング集計を書き込む :この方法は、コンプリートモードよりもはるかに効率的です。
 - データベース変更のストリームを Delta テーブルに書き込む : 変更データを書き込むためのマージ クエリ を 
foreachBatchで使用して、変更のストリームを Delta テーブルに継続的に適用できます。 - 重複除去を使用して Delta テーブルにデータのストリームを書き込む : 重複除去のための挿入専用マージ クエリ を 
foreachBatchで使用して、自動重複除去を使用して Delta テーブルに (重複を含む) データを継続的に書き込むことができます。 
- ストリーミングクエリーを再起動すると、データの同じバッチに操作が複数回適用される可能性があるため、 
foreachBatch内のmergeステートメントがべき等であることを確認してください。 mergeがforeachBatchで使用されている場合、ストリーミングクエリーの入力データレート (StreamingQueryProgressを通じてレポートされ、ノートブックのレートグラフに表示されます) は、ソースでデータが生成される実際のレートの倍数としてレポートされる場合があります。これは、mergeが入力データを複数回読み込むため、入力メトリクスが乗算されることが理由です。これがボトルネックになる場合は、mergeの前にバッチの データフレーム をキャッシュし、mergeの後にキャッシュを解除することができます。
次の例は、 foreachBatch 内で SQL を使用してこのタスクを実行する方法を示しています。
- Scala
 - Python
 
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")
  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")
  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)
次の例のように、Delta Lake API を使用してストリーミングの Upsert を実行することも選択できます。
- Scala
 - Python
 
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)
べき等テーブルは foreachBatch
Databricks では、 foreachBatchを使用する代わりに、更新するシンクごとに個別のストリーミング書き込みを構成することをお勧めします。これは、'foreachBatch' を使用すると複数のテーブルへの書き込みがシリアル化されるため、並列化が減少し、全体的な待機時間が増加するためです。
Delta テーブルでは、foreachBatchべき等内の複数のテーブルに書き込むために、次の DataFrameWriter オプションがサポートされています。
txnAppId: 各 データフレーム 書き込みで渡すことができる一意の文字列。 たとえば、StreamingQuery ID をtxnAppIdとして使用できます。txnVersion: トランザクションのバージョンとして機能する単調増加の数値。
Delta Lake は、 txnAppId と txnVersion の組み合わせを使用して、重複する書き込みを特定し、それらを無視します。
バッチ書き込みが失敗で中断された場合、バッチを再実行すると、同じアプリケーションとバッチ ID を使用して、ランタイムが重複する書き込みを正しく識別し、それらを無視するのに役立ちます。 アプリケーション ID(txnAppId)は、ユーザーが生成した任意の一意の文字列にすることができ、ストリーム ID に関連付ける必要はありません。 「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
ストリーミング・チェックポイントを削除し、新しいチェックポイントでクエリを再開する場合は、別の txnAppIdを指定する必要があります。 新しいチェックポイントは、バッチ ID 0で開始されます。 Delta Lake は、バッチ ID とtxnAppIdを一意のキーとして使用し、既に表示されている値を持つバッチをスキップします。
次のコード例は、このパターンを示しています。
- Python
 - Scala
 
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}