Delta table ストリーミング 読み取りと書き込み
Delta Lake は、readStream
とwriteStream
を通じて Spark 構造化ストリーミング と深く統合されています。 Delta Lake は、ストリーミング システムやファイルに通常関連する次のような多くの制限を克服します。
- 低遅延の取り込みによって生成された小さなファイルを結合します。
- 複数のストリーム (または並列 バッチジョブ) で "exactly-once" 処理を維持します。
- ストリームのソースとしてファイルを使用する場合に、新しいファイルを効率的に検出します。
この記事では、Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法について説明します。 Databricks SQL でストリーミング テーブルを使用してデータを読み込む方法については、「 Databricks SQL でストリーミング テーブルを使用してデータを読み込む」を参照してください。
Delta Lakeを使用したストリーム静的結合に関する情報については、「ストリーム静的結合」を参照してください。
Delta テーブルをソース
構造化ストリーミングは、Delta テーブルを増分的に読み取ります。 ストリーミング クエリが Delta テーブルに対してアクティブである間、新しいレコードは、新しいテーブル バージョンがソース テーブルにコミットされるとべき等に処理されます。
次のコード例は、テーブル名またはファイルパスを使用したストリーミング読み取りの設定を示しています。
- Python
- Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Deltaテーブルのスキーマが、そのテーブルに対してストリーミング読み取りを開始した後に変更された場合、クエリは失敗します。ほとんどのスキーマの変更では、ストリームを再起動してスキーマの不一致を解決することで処理を続行することができます。
Databricks Runtime 12.2 LTS 以下では、列マッピングが有効になっている Delta テーブルからストリームを送信することはできません。このテーブルは、名前の変更や列の削除など、非加法的なスキーマの進化を経ています。詳細については、「 列マッピングとスキーマの変更によるストリーミング」を参照してください。
入力レートの制限
マイクロバッチの制御には、以下のオプションがあります:
maxFilesPerTrigger
:各マイクロバッチで考慮される新しいファイルの数。デフォルトは1000です。maxBytesPerTrigger
:各マイクロバッチで処理されるデータの量。このオプションでは「ソフトマックス」を設定します。つまり、バッチはおおよそこの量のデータを処理し、最小の入力単位がこの制限よりも大きい場合にストリーミングクエリを進めるために、制限を超えるデータを処理する可能性があります。これはデフォルトでは設定されていません。
maxFilesPerTrigger
とmaxBytesPerTrigger
を組み合わせて使用すると、マイクロバッチは maxFilesPerTrigger
またはmaxBytesPerTrigger
の制限に達するまでデータを処理します。
logRetentionDuration
設定によってソーステーブルのトランザクションがクリーンアップされ、ストリーミングクエリがそれらのバージョンを処理しようとすると、デフォルトでは、データ損失を避けるためにクエリは失敗します。オプション failOnDataLoss
を false
に設定すると、失われたデータを無視して処理を続行できます。
ストリーム a Delta Lake チェンジデータキャプチャ (CDC) feed
Delta Lake チェンジデータフィード は、更新や削除など、 Delta テーブルへの変更を記録します。 有効にすると、チェンジデータフィードからストリームを送信し、挿入、更新、および削除をダウンストリームテーブルに処理するロジックを書き込むことができます。 チェンジデータフィードのデータ出力は、それが記述する Delta テーブルとは若干異なりますが、これにより、 メダリオンアーキテクチャのダウンストリームテーブルに増分変更を伝播するためのソリューションが提供されます。
Databricks Runtime 12.2 LTS以下では、カラムマッピングが有効になっているDeltaテーブルのチェンジデータフィードからストリームすることはできません。このテーブルは、カラムの名前変更や削除などの非加法的なスキーマ進化を経ています。列マッピングとスキーマの変更によるストリーミングを参照してください。
更新と削除を無視する
構造化ストリーミングは、追加でない入力を処理せず、ソースとして使用されているテーブルに何らかの変更が発生すると例外をスローします。自動的にダウンストリームに伝搬することができない変更に対処するには、主に2つの戦略があります:
- 出力とチェックポイントを削除して、ストリームを最初から再開する。
- 次の2つのオプションのどちらかを設定する:
ignoreDeletes
:パーティション境界でデータを削除するトランザクションを無視する。skipChangeCommits
:既存のレコードを削除または変更するトランザクションを無視する。skipChangeCommits
はignoreDeletes
を包含します。
Databricks Runtime 12.2 LTS 以降では、 skipChangeCommits
は以前の設定 ignoreChanges
を非推奨にします。 Databricks Runtime 11.3 LTS 以前では、 ignoreChanges
のみがサポートされているオプションです。
ignoreChanges
のセマンティクは、skipChangeCommits
と大きく異なります。ignoreChanges
を有効にすると、UPDATE
、MERGE INTO
、DELETE
(パーティション内)、またはOVERWRITE
などのデータ変更操作の後に、ソーステーブル内の書き換えられたデータファイルが再出力されます。変更されていない行は、しばしば新しい行と一緒に出力されるため、ダウンストリームのコンシューマーは重複を処理できる必要があります。削除はダウンストリームに反映されません。ignoreChanges
はignoreDeletes
を包含します。
skipChangeCommits
はファイルの変更操作を完全に無視します。UPDATE
、MERGE INTO
、DELETE
、OVERWRITE
などのデータ変更操作によってソーステーブル内で書き換えられたデータファイルは完全に無視されます。アップストリームのソーステーブルに変更を反映させるには、これらの変更を伝搬するためのロジックを別途実装する必要があります。
ignoreChanges
で構成されたワークロードは、引き続き既知のセマンティクスを使用して動作しますが、Databricks では、すべての新しいワークロードに skipChangeCommits
を使用することをお勧めします。ignoreChanges
から skipChangeCommits
へのワークロードの移行には、リファクタリング ロジックが必要です。
例
たとえば、date
によってパーティション化された、date
、user_email
、およびaction
列を含むテーブルuser_events
があるとします。user_events
テーブルからストリーミングしており、GDPRのためテーブルからデータを削除する必要があります。
パーティション境界で削除する場合 (つまり、 WHERE
がパーティション列にある場合)、ファイルは既に値でセグメント化されているため、削除によってメタデータからそれらのファイルがドロップされるだけです。 データのパーティション全体を削除する場合は、次のものを使用できます。
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
複数のパーティションのデータを削除する場合(この例では user_email
でフィルタリングしています)、次の構文を使用してください。
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
UPDATE
ステートメントでuser_email
を更新すると、問題のuser_email
を含むファイルが書き換えられます。skipChangeCommits
を使用して、変更されたデータ ファイルを無視します。
初期位置の指定
次のオプションを使用すると、テーブル全体を処理せずに、Delta Lakeストリーミングソースの開始点を指定できます。
-
startingVersion
: 開始点となる Delta Lake バージョン。 Databricks では、ほとんどのワークロードでこのオプションを省略することをお勧めします。 設定されていない場合、ストリームは、その時点でのテーブルの完全なスナップショットを含む、使用可能な最新バージョンから開始されます。指定した場合、ストリームは、指定したバージョン (両端を含む) から始まる Delta テーブルに対するすべての変更を読み取ります。 指定したバージョンが使用できなくなった場合、ストリームは開始に失敗します。 コミット・バージョンは、DESCRIBE HISTORY コマンド出力の
version
列から取得できます。最新の変更のみを返すには、
latest
を指定します。 -
startingTimestamp
: タイムスタンプを開始するポイントです。タイムスタンプ以降に行われたすべてのテーブル変更は、ストリーミングリーダーによって読み取られます。提供されたタイムスタンプがすべてのテーブルのコミットより前にある場合、ストリーミングの読み取りは利用可能な最も早いタイムスタンプから始まります。これは次のいずれかになります。- タイムスタンプ文字列。たとえば、
"2019-01-01T00:00:00.000Z"
などです。 - 日付文字列。たとえば、
"2019-01-01"
などです。
- タイムスタンプ文字列。たとえば、
両方のオプションを同時に設定することはできません。 これらは、新しいストリーミング クエリを開始するときにのみ有効になります。 ストリーミング クエリが開始され、進行状況がチェックポイントに記録されている場合、これらのオプションは無視されます。
指定したバージョン、またはタイムスタンプからストリーミングソースを開始できますが、ストリーミングソースのスキーマは常にDeltaテーブルの最新のスキーマになります。指定されたバージョンまたはタイムスタンプ以降に、Deltaテーブルに互換性のないスキーマ変更がないことを確認する必要があります。そうしないと、ストリーミングソースが不正なスキーマでデータを読み取った際に、間違った結果を返す可能性があります。
例
たとえば、user_events
というテーブルがあるとします。バージョン5以降の変更を読み取りたい場合は、次を使用します:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
2018年10月18日以降の変更を読み取りたい場合は、次を使用します。
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
データを削除せずに初期スナップショットを処理
この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。
Deltaテーブルをストリームソースとして使用する場合、クエリはまずテーブルに存在するすべてのデータを処理します。このバージョンのDeltaテーブルは、初期スナップショットと呼ばれています。デフォルトでは、Deltaテーブルのデータファイルは、最後に更新されたファイルに基づいて処理されます。ただし、最終変更時刻は必ずしも記録イベントの時間順を表すわけではありません。
電子透かしが定義されたステートフルストリーミングクエリでは、ファイルを更新時間順に処理すると、レコードが誤った順序で処理される可能性があります。このため、電子透かしによって記録がレイトイベントとして取りこぼされる可能性があります。
次のオプションを有効にすることで、データドロップの問題を回避できます。
- withEventTimeOrder: 初期スナップショットをイベント時間順に処理するかどうか。
イベント時間順序を有効にすると、初期スナップショットデータのイベント時間の範囲がタイムバケットに分割されます。各マイクロバッチは、時間範囲内のデータをフィルタリングすることによってバケットを処理します。maxFilesPerTrigger および maxBytesPerTrigger 構成オプションは引き続きマイクロバッチサイズの制御に適用できますが、処理の性質上、近似的な方法でのみ適用されます。
以下の図は、このプロセスを示しています。
この機能に関する重要な情報:
-
データドロップの問題は、ステートフルストリーミングクエリーの最初の Delta スナップショットがデフォルトの順序で処理される場合にのみ発生します。
-
最初のスナップショットの処理中にストリームクエリーが開始されると、それ以降は
withEventTimeOrder
を変更することはできません。withEventTimeOrder
を変更して再起動するには、チェックポイントを削除する必要があります。 -
withEventTimeOrder を有効にしてストリームクエリーを実行している場合、最初のスナップショット処理が完了するまで、この機能をサポートしていないDBRバージョンにダウングレードすることはできません。ダウングレードが必要な場合は、最初のスナップショットが終了するのを待つか、チェックポイントを削除してクエリーを再開してください。
-
この機能は、以下のような稀なケースではサポートされていません。
- イベント時間列はジェネレーテッドカラムであり、Delta ソースと電子透かしの間に非投影変換がある場合。
- ストリームクエリーに複数の Delta ソースを持つ電子透かしがある場合。
-
イベント時間順を有効にすると、Delta の初期スナップショット処理のパフォーマンスが低下する可能性があります。
-
各マイクロバッチは、初期スナップショットをスキャンして、対応するイベント時間範囲内のデータをフィルタリングします。 フィルター操作を高速化するには、データのスキップを適用できるように、イベント時間として Delta ソース列を使用することをお勧めします (該当する場合は、 Delta Lake のデータのスキップ を確認してください)。 さらに、イベント時間列に沿ってテーブルをパーティション分割すると、処理をさらに高速化できます。 Spark UI を確認して、特定のマイクロバッチでスキャンされたデルタファイルの数を確認できます。
例
event_time
列を持つ user_events
というテーブルがあるとします。ストリーミングクエリーは集計クエリーです。初期スナップショット処理中にデータドロップが起きないようにするには、以下を使用します。
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
すべてのストリーミングクエリーに適用されるクラスター上の Spark 構成を使用してこの機能を有効にすることもできます。 spark.databricks.delta.withEventTimeOrder.enabled true
Delta テーブルをシンクとして
構造化ストリーミングを使用して Delta テーブルにデータを書き込むこともできます。テーブルに対して他のストリームやバッチクエリーが同時に実行されている場合でも、トランザクションログにより、Delta Lake で 1 回のみの処理が保証されます。
Delta Lake VACUUM
関数は、Delta Lake によって管理されていないすべてのファイルを削除しますが、_
で始まるディレクトリはスキップします。<table-name>/_checkpoints
などのディレクトリ構造を使用すると、Delta テーブルの他のデータやメタデータと一緒にチェックポイントを安全に保存できます。
メトリクス
ストリーミングクエリプロセスでまだ処理されていないバイト数とファイル数は、numBytesOutstanding
および numFilesOutstanding
メトリクスとして確認できます。その他のメトリクスには、次のものがあります。
numNewListedFiles
: このバッチのバックログを計算するためにリスト化された Delta Lake ファイルの数。backlogEndOffset
: バックログの計算に使用されるテーブルバージョン。
ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
追加モード
デフォルトでは、ストリームは追加モードで実行され、新しいレコードがテーブルに追加されます。
テーブルにストリーミングする場合は、次の例のように toTable
メソッドを使用します。
- Python
- Scala
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
コンプリートモード
構造化ストリーミングを使用して、テーブル全体をバッチごとに置き換えることもできます。ユースケースの一例として、集計を用いたサマリーの計算があります。
- Python
- Scala
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
前述の例では、顧客ごとのイベントの総数を含むテーブルが継続的に更新されます。
レイテンシ要件がより緩やかなアプリケーションの場合は、1 回限りのトリガーでコンピューティングリソースを節約できます。これらを使用することで、特定のスケジュールでサマリー集計テーブルを更新し、最後の更新時以降に到着した新しいデータのみを処理することができます。
を使用してストリーミング クエリからアップサートします foreachBatch
merge
と foreachBatch
を組み合わせて使用すると、ストリーミング クエリから Delta テーブルに複雑なアップサートを書き込むことができます。「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
このパターンには、以下のように様々な適用方法があります。
- 更新モードでストリーミング集計を書き込む :この方法は、コンプリートモードよりもはるかに効率的です。
- データベース変更のストリームを Delta テーブルに書き込む : 変更データを書き込むためのマージ クエリ を
foreachBatch
で使用して、変更のストリームを Delta テーブルに継続的に適用できます。 - 重複除去を使用して Delta テーブルにデータのストリームを書き込む : 重複除去のための挿入専用マージ クエリ を
foreachBatch
で使用して、自動重複除去を使用して Delta テーブルに (重複を含む) データを継続的に書き込むことができます。
- ストリーミングクエリーを再起動すると、データの同じバッチに操作が複数回適用される可能性があるため、
foreachBatch
内のmerge
ステートメントがべき等であることを確認してください。 merge
がforeachBatch
で使用されている場合、ストリーミングクエリーの入力データレート (StreamingQueryProgress
を通じてレポートされ、ノートブックのレートグラフに表示されます) は、ソースでデータが生成される実際のレートの倍数としてレポートされる場合があります。これは、merge
が入力データを複数回読み込むため、入力メトリクスが乗算されることが理由です。これがボトルネックになる場合は、merge
の前にバッチの データフレーム をキャッシュし、merge
の後にキャッシュを解除することができます。
次の例は、 foreachBatch
内で SQL を使用してこのタスクを実行する方法を示しています。
- Scala
- Python
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
次の例のように、Delta Lake API を使用してストリーミングの Upsert を実行することも選択できます。
- Scala
- Python
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
べき等テーブルは foreachBatch
Databricks では、 foreachBatch
を使用する代わりに、更新するシンクごとに個別のストリーミング書き込みを構成することをお勧めします。 これは、'foreachBatch' を使用すると複数のテーブルへの書き込みがシリアル化されるため、並列化が減少し、全体的な待機時間が増加するためです。
Delta テーブルでは、foreachBatch
べき等内の複数のテーブルに書き込むために、次の DataFrameWriter
オプションがサポートされています。
txnAppId
: 各 データフレーム 書き込みで渡すことができる一意の文字列。 たとえば、StreamingQuery ID をtxnAppId
として使用できます。txnVersion
: トランザクションのバージョンとして機能する単調増加の数値。
Delta Lake は、 txnAppId
と txnVersion
の組み合わせを使用して、重複する書き込みを特定し、それらを無視します。
バッチ書き込みが失敗で中断された場合、バッチを再実行すると、同じアプリケーションとバッチ ID を使用して、ランタイムが重複する書き込みを正しく識別し、それらを無視するのに役立ちます。 アプリケーション ID(txnAppId
)は、ユーザーが生成した任意の一意の文字列にすることができ、ストリーム ID に関連付ける必要はありません。 「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。
ストリーミング・チェックポイントを削除し、新しいチェックポイントでクエリを再開する場合は、別の txnAppId
を指定する必要があります。 新しいチェックポイントは、バッチ ID 0
で開始されます。 Delta Lake は、バッチ ID とtxnAppId
を一意のキーとして使用し、既に表示されている値を持つバッチをスキップします。
次のコード例は、このパターンを示しています。
- Python
- Scala
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}