マージを使用してDelta Lakeテーブルにアップサートする
MERGE
SQL操作を使用して、ソーステーブル、ビュー、またはDataFrameからターゲットDeltaテーブルにデータを更新/挿入できます。Delta Lakeは、MERGE
での挿入、更新、削除をサポートしており、高度なユースケースを容易にするためにSQL標準を超える拡張構文をサポートしています。
people10mupdates
という名前のソーステーブル、または/tmp/delta/people-10m-updates
のソースパスがあり、people10m
という名前のターゲットテーブル、または/tmp/delta/people-10m
のターゲットパスの新しいデータが含まれているとします。これらの新しいレコードの一部は、対象データにすでに存在している可能性があります。新しいデータをマージするには、その人のid
が既に存在する行を更新し、一致するid
が存在しない新しい行を挿入します。以下のクエリーを実行できる:
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
重要
ソース・テーブルの 1 つの行のみが、ターゲット・テーブルの特定の行に一致します。 Databricks Runtime 16.0 以降では、 MERGE
句と WHEN MATCHED
句と ON
句で指定された条件を評価して、重複する一致を判断します。 Databricks Runtime 15.4 LTS 以下では、 MERGE
操作では ON
句で指定された条件のみが考慮されます。
Scala と Python の構文の詳細については、 Delta Lake API のドキュメント を参照してください。 SQL 構文の詳細については、MERGE INTOを参照してください。
マージを使用して一致しない行をすべて変更する
Databricks SQL および Databricks Runtime 12.2 LTS 以降では、 WHEN NOT MATCHED BY SOURCE
句を使用して、ソース テーブル内に対応するレコードがないターゲット テーブル内のレコードをUPDATE
またはDELETE
できます。 Databricks では、ターゲット テーブルが完全に書き換えられるのを避けるために、オプションの条件句を追加することをお勧めします。
次のコード例は、これを削除に使用し、ターゲット テーブルをソース テーブルの内容で上書きし、ターゲット テーブル内の一致しないレコードを削除する基本的な構文を示しています。 ソースの更新と削除に時間制限があるテーブルのよりスケーラブルなパターンについては、 Delta テーブルをソースと増分同期するを参照してください。
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
次の例では、WHEN NOT MATCHED BY SOURCE
句に条件を追加し、一致しないターゲット行で更新する値を指定します。
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
マージ操作のセマンティクス
以下は merge
プログラム操作のセマンティクスの詳細である。
whenMatched
句とwhenNotMatched
句はいくつでも指定できます。whenMatched
句は、一致条件に基づいてソース行がターゲットテーブルの行と一致する場合に実行されます。これらの句には、次のセマンティクスがあります。whenMatched
句には、最大 1 つのupdate
アクションと 1 つのdelete
アクションを含めることができます。merge
のupdate
アクションは、一致したターゲット行の指定された列 (update
操作と同様) のみを更新します。delete
アクションは、一致した行を削除します。各
whenMatched
句には、省略可能な条件を設定できます。この句条件が存在する場合、句条件がtrueの場合にのみ、一致するソース行とターゲット行のペアに対してupdate
またはdelete
アクションが実行されます。複数の
whenMatched
句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenMatched
句には条件が必要です。マージ条件に一致するソース行とターゲット行のペアについて、どの
whenMatched
条件もtrueと評価されない場合、ターゲット行は変更されないままになります。ターゲットデルタテーブルのすべての列をソースデータセットの対応する列で更新するには、
whenMatched(...).updateAll()
を使用します。これは次と同等です:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
これはターゲットのDeltaテーブルのすべての列に対する処理です。したがって、このアクションでは、ソーステーブルにターゲットテーブルの列と同じ列があることが前提となっています。そうでない場合、クエリは分析エラーをスローします。
注
自動スキーマ進化が有効になっている場合、この動作は変更されます。 詳細については、 自動スキーマ進化を参照してください。
whenNotMatched
句は、一致条件に基づいてソース行がターゲット行と一致しない場合に実行されます。これらの句には、次のセマンティクスがあります。whenNotMatched
句には、insert
アクションのみを含めることができます。新しい行は、指定された列と対応する式に基づいて生成されます。ターゲットテーブルのすべての列を指定する必要はありません。指定されていないターゲット列の場合は、NULL
が挿入されます。各
whenNotMatched
句には、省略可能な条件を設定できます。句条件が存在する場合、その行に対してその条件がtrueの場合にのみソース行が挿入されます。それ以外の場合、ソース列は無視されます。複数の
whenNotMatched
句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenNotMatched
句には条件が必要です。ターゲットデルタテーブルのすべての列を、ソースデータセットの対応する列とともに挿入するには、
whenNotMatched(...).insertAll()
を使用します。これは次と同等です:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
これはターゲットのDeltaテーブルのすべての列に対する処理です。したがって、このアクションでは、ソーステーブルにターゲットテーブルの列と同じ列があることが前提となっています。そうでない場合、クエリは分析エラーをスローします。
注
自動スキーマ進化が有効になっている場合、この動作は変更されます。 詳細については、 自動スキーマ進化を参照してください。
whenNotMatchedBySource
句は、マージ条件に基づいてターゲット行がソース行と一致しない場合に実行されます。これらの句には、次のセマンティクスがあります。whenNotMatchedBySource
節はdelete
とupdate
のアクションを指定できる。各
whenNotMatchedBySource
句には、省略可能な条件を設定できます。句条件が存在する場合、ターゲット行は、その行に対してその条件がtrueである場合にのみ変更されます。それ以外の場合、ターゲット行は変更されません。複数の
whenNotMatchedBySource
句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenNotMatchedBySource
句には条件が必要です。定義上、
whenNotMatchedBySource
句には列値を取得するソース行がないため、ソース列を参照できません。変更する列ごとに、リテラルを指定するか、ターゲット列に対してアクション(SET target.deleted_count = target.deleted_count + 1
など)を実行できます。
重要
ソースデータセットで複数の行が一致し、マージによってターゲット Delta テーブルの同じ行の更新が試行された場合、
merge
操作が失敗する可能性があります。マージの SQL セマンティクスに従い、一致したターゲット行を更新するためにどのソース行を使用する必要があるかが不明瞭である場合、このような更新操作はあいまいとなります。ソーステーブルを前処理することで、複数一致が発生しないようにすることができます。SQL VIEWにSQL
MERGE
操作を適用できるのは、ビューがCREATE VIEW viewName AS SELECT * FROM deltaTable
として定義されている場合のみです。
Deltaテーブルへの書き込み時のデータ重複排除
一般的なETLの使用例は、ログをテーブルに追加してデルタテーブルにログを収集することです。ただし、多くの場合、ソースは重複したログレコードを生成する可能性があり、それらに対処するためにダウンストリームの重複排除手順が必要になります。merge
を使用すると、重複するレコードの挿入を回避できます。
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
注
新しいログを含むデータセットは、それ自体内で重複排除する必要があります。マージのSQLセマンティクスにより、新しいデータをテーブル内の既存のデータと照合して重複を排除しますが、新しいデータセット内に重複データがある場合は、そのデータが挿入されます。したがって、テーブルにマージする前に、新しいデータの重複を排除してください。
重複レコードが数日間しか取得されないことがわかっている場合は、テーブルを日付でパーティション分割し、照合するターゲットテーブルの日付範囲を指定することで、クエリーをさらに最適化できます。
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
これは、テーブル全体ではなく、過去7日間のログのみで重複を検索するため、前のコマンドよりも効率的です。さらに、この挿入専用マージを構造化ストリーミングと使用して、ログの継続的な重複排除を実行できます。
ストリーミング クエリでは、
foreachBatch
でマージ操作を使用して、重複排除を使用してストリーミング データを Delta テーブルに継続的に書き込むことができます。foreachBatch
の詳細については、次の ストリーミングの例 を参照してください。別のストリーミングクエリーでは、このDelatテーブルから重複排除されたデータを継続的に読み込むことができます。挿入のみのマージは、Deltaテーブルに新しいデータを追加するだけであるため、このようなことが可能です。
Delta Lakeを使用した緩やかに変化するデータ (SCD) とチェンジデータキャプチャ (CDC)
Delta Live Tables は、SCD タイプ 1 およびタイプ 2 の追跡と適用をネイティブでサポートしています。Delta Live Tables でAPPLY CHANGES INTO
を使用して、CDC フィードを処理するときに順序が正しくないレコードが正しく処理されるようにします。 APPLY CHANGES API: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します参照してください。
Deltaテーブルをソースと増分同期する
Databricks SQL および Databricks Runtime 12.2 LTS 以降では、 WHEN NOT MATCHED BY SOURCE
を使用して任意の条件を作成し、テーブルの一部をアトミックに削除および置換できます。 これは、最初のデータ入力後数日間はレコードが変更または削除される可能性があるが、最終的には最終状態に落ち着くソース テーブルがある場合に特に役立ちます。
次のクエリは、このパターンを使用して、ソースから5日分のレコードを選択し、ターゲットの一致するレコードを更新し、ソースからターゲットに新しいレコードを挿入し、ターゲットの過去5日間の一致しないレコードをすべて削除することを示しています。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
ソーステーブルとターゲットテーブルに同じブールフィルタを提供することにより、削除を含む変更をソーステーブルからターゲットテーブルに動的に伝播することができます。
注
このパターンは条件句なしで使用できますが、ターゲットテーブルを完全に書き換えることになり、コストがかかる可能性があります。