マージを使用してDelta Lakeテーブルにアップサートする

MERGE SQL操作を使用して、ソーステーブル、ビュー、またはDataFrameからターゲットDeltaテーブルにデータを更新/挿入できます。Delta Lakeは、MERGEでの挿入、更新、削除をサポートしており、高度なユースケースを容易にするためにSQL標準を超える拡張構文をサポートしています。

people10mupdatesという名前のソーステーブル、または/tmp/delta/people-10m-updatesのソースパスがあり、people10mという名前のターゲットテーブル、または/tmp/delta/people-10mのターゲットパスの新しいデータが含まれているとします。これらの新しいレコードの一部は、対象データにすでに存在している可能性があります。新しいデータをマージするには、その人のidが既に存在する行を更新し、一致するidが存在しない新しい行を挿入します。以下のクエリーを実行できる:

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )
from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

重要

ソース・テーブルの 1 つの行のみが、ターゲット・テーブルの特定の行に一致します。 Databricks Runtime 16.0 以降では、 MERGE 句と WHEN MATCHED 句と ON 句で指定された条件を評価して、重複する一致を判断します。 Databricks Runtime 15.4 LTS 以下では、 MERGE 操作では ON 句で指定された条件のみが考慮されます。

Scala と Python の構文の詳細については、 Delta Lake API のドキュメント を参照してください。 SQL 構文の詳細については、MERGE INTOを参照してください。

マージを使用して一致しない行をすべて変更する

Databricks SQL および Databricks Runtime 12.2 LTS 以降では、 WHEN NOT MATCHED BY SOURCE句を使用して、ソース テーブル内に対応するレコードがないターゲット テーブル内のレコードをUPDATEまたはDELETEできます。 Databricks では、ターゲット テーブルが完全に書き換えられるのを避けるために、オプションの条件句を追加することをお勧めします。

次のコード例は、これを削除に使用し、ターゲット テーブルをソース テーブルの内容で上書きし、ターゲット テーブル内の一致しないレコードを削除する基本的な構文を示しています。 ソースの更新と削除に時間制限があるテーブルのよりスケーラブルなパターンについては、 Delta テーブルをソースと増分同期するを参照してください。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

次の例では、WHEN NOT MATCHED BY SOURCE句に条件を追加し、一致しないターゲット行で更新する値を指定します。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

マージ操作のセマンティクス

以下は merge プログラム操作のセマンティクスの詳細である。

  • whenMatched句とwhenNotMatched句はいくつでも指定できます。

  • whenMatched 句は、一致条件に基づいてソース行がターゲットテーブルの行と一致する場合に実行されます。これらの句には、次のセマンティクスがあります。

    • whenMatched 句には、最大 1 つの update アクションと 1 つの delete アクションを含めることができます。 mergeupdate アクションは、一致したターゲット行の指定された列 (update 操作と同様) のみを更新します。delete アクションは、一致した行を削除します。

    • whenMatched句には、省略可能な条件を設定できます。この句条件が存在する場合、句条件がtrueの場合にのみ、一致するソース行とターゲット行のペアに対してupdateまたはdeleteアクションが実行されます。

    • 複数のwhenMatched句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenMatched句には条件が必要です。

    • マージ条件に一致するソース行とターゲット行のペアについて、どのwhenMatched条件もtrueと評価されない場合、ターゲット行は変更されないままになります。

    • ターゲットデルタテーブルのすべての列をソースデータセットの対応する列で更新するには、whenMatched(...).updateAll()を使用します。これは次と同等です:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      これはターゲットのDeltaテーブルのすべての列に対する処理です。したがって、このアクションでは、ソーステーブルにターゲットテーブルの列と同じ列があることが前提となっています。そうでない場合、クエリは分析エラーをスローします。

      自動スキーマ進化が有効になっている場合、この動作は変更されます。 詳細については、 自動スキーマ進化を参照してください。

  • whenNotMatched 句は、一致条件に基づいてソース行がターゲット行と一致しない場合に実行されます。これらの句には、次のセマンティクスがあります。

    • whenNotMatched 句には、insertアクションのみを含めることができます。新しい行は、指定された列と対応する式に基づいて生成されます。ターゲットテーブルのすべての列を指定する必要はありません。指定されていないターゲット列の場合は、NULLが挿入されます。

    • whenNotMatched句には、省略可能な条件を設定できます。句条件が存在する場合、その行に対してその条件がtrueの場合にのみソース行が挿入されます。それ以外の場合、ソース列は無視されます。

    • 複数のwhenNotMatched句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenNotMatched句には条件が必要です。

    • ターゲットデルタテーブルのすべての列を、ソースデータセットの対応する列とともに挿入するには、whenNotMatched(...).insertAll()を使用します。これは次と同等です:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      これはターゲットのDeltaテーブルのすべての列に対する処理です。したがって、このアクションでは、ソーステーブルにターゲットテーブルの列と同じ列があることが前提となっています。そうでない場合、クエリは分析エラーをスローします。

      自動スキーマ進化が有効になっている場合、この動作は変更されます。 詳細については、 自動スキーマ進化を参照してください。

  • whenNotMatchedBySource 句は、マージ条件に基づいてターゲット行がソース行と一致しない場合に実行されます。これらの句には、次のセマンティクスがあります。

    • whenNotMatchedBySource 節はdeleteupdateのアクションを指定できる。

    • whenNotMatchedBySource句には、省略可能な条件を設定できます。句条件が存在する場合、ターゲット行は、その行に対してその条件がtrueである場合にのみ変更されます。それ以外の場合、ターゲット行は変更されません。

    • 複数のwhenNotMatchedBySource句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenNotMatchedBySource句には条件が必要です。

    • 定義上、whenNotMatchedBySource句には列値を取得するソース行がないため、ソース列を参照できません。変更する列ごとに、リテラルを指定するか、ターゲット列に対してアクション(SET target.deleted_count = target.deleted_count + 1など)を実行できます。

重要

  • ソースデータセットで複数の行が一致し、マージによってターゲット Delta テーブルの同じ行の更新が試行された場合、merge 操作が失敗する可能性があります。マージの SQL セマンティクスに従い、一致したターゲット行を更新するためにどのソース行を使用する必要があるかが不明瞭である場合、このような更新操作はあいまいとなります。ソーステーブルを前処理することで、複数一致が発生しないようにすることができます。

  • SQL VIEWにSQLMERGE操作を適用できるのは、ビューがCREATE VIEW viewName AS SELECT * FROM deltaTableとして定義されている場合のみです。

Deltaテーブルへの書き込み時のデータ重複排除

一般的なETLの使用例は、ログをテーブルに追加してデルタテーブルにログを収集することです。ただし、多くの場合、ソースは重複したログレコードを生成する可能性があり、それらに対処するためにダウンストリームの重複排除手順が必要になります。mergeを使用すると、重複するレコードの挿入を回避できます。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

新しいログを含むデータセットは、それ自体内で重複排除する必要があります。マージのSQLセマンティクスにより、新しいデータをテーブル内の既存のデータと照合して重複を排除しますが、新しいデータセット内に重複データがある場合は、そのデータが挿入されます。したがって、テーブルにマージする前に、新しいデータの重複を排除してください。

重複レコードが数日間しか取得されないことがわかっている場合は、テーブルを日付でパーティション分割し、照合するターゲットテーブルの日付範囲を指定することで、クエリーをさらに最適化できます。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

これは、テーブル全体ではなく、過去7日間のログのみで重複を検索するため、前のコマンドよりも効率的です。さらに、この挿入専用マージを構造化ストリーミングと使用して、ログの継続的な重複排除を実行できます。

  • ストリーミング クエリでは、 foreachBatch でマージ操作を使用して、重複排除を使用してストリーミング データを Delta テーブルに継続的に書き込むことができます。 foreachBatchの詳細については、次の ストリーミングの例 を参照してください。

  • 別のストリーミングクエリーでは、このDelatテーブルから重複排除されたデータを継続的に読み込むことができます。挿入のみのマージは、Deltaテーブルに新しいデータを追加するだけであるため、このようなことが可能です。

Delta Lakeを使用した緩やかに変化するデータ (SCD) とチェンジデータキャプチャ (CDC)

Delta Live Tables は、SCD タイプ 1 およびタイプ 2 の追跡と適用をネイティブでサポートしています。Delta Live Tables でAPPLY CHANGES INTOを使用して、CDC フィードを処理するときに順序が正しくないレコードが正しく処理されるようにします。 APPLY CHANGES API: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します参照してください。

Deltaテーブルをソースと増分同期する

Databricks SQL および Databricks Runtime 12.2 LTS 以降では、 WHEN NOT MATCHED BY SOURCEを使用して任意の条件を作成し、テーブルの一部をアトミックに削除および置換できます。 これは、最初のデータ入力後数日間はレコードが変更または削除される可能性があるが、最終的には最終状態に落ち着くソース テーブルがある場合に特に役立ちます。

次のクエリは、このパターンを使用して、ソースから5日分のレコードを選択し、ターゲットの一致するレコードを更新し、ソースからターゲットに新しいレコードを挿入し、ターゲットの過去5日間の一致しないレコードをすべて削除することを示しています。

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

ソーステーブルとターゲットテーブルに同じブールフィルタを提供することにより、削除を含む変更をソーステーブルからターゲットテーブルに動的に伝播することができます。

このパターンは条件句なしで使用できますが、ターゲットテーブルを完全に書き換えることになり、コストがかかる可能性があります。