マージを使用してDelta Lakeテーブルに更新/挿入します

MERGE SQL操作を使用して、ソーステーブル、ビュー、またはDataFrameからターゲットDeltaテーブルにデータを更新/挿入できます。Delta Lakeは、MERGEでの挿入、更新、削除をサポートしており、高度なユースケースを容易にするためにSQL標準を超える拡張構文をサポートしています。

people10mupdatesという名前のソーステーブル、または/tmp/delta/people-10m-updatesのソースパスがあり、people10mという名前のターゲットテーブル、または/tmp/delta/people-10mのターゲットパスの新しいデータが含まれているとします。これらの新しいレコードの一部は、対象データにすでに存在している可能性があります。新しいデータをマージするには、その人のidが既に存在する行を更新し、一致するidが存在しない新しい行を挿入します。以下のクエリーを実行できる:

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Scala と Python の構文の詳細については、 Delta Lake API のドキュメント を参照してください。 SQL 構文の詳細については、「 マージ先」を参照してください。

マージを使用して一致しない行をすべて変更する

Databricks SQLおよびDatabricks Runtime 12.1以降では、ソーステーブルに対応するレコードがないターゲットテーブルのUPDATEまたはDELETEレコードにWHEN NOT MATCHED BY SOURCE句を使用できます。Databricksでは、ターゲットテーブルの完全な書き換えを避けるために、オプションの条件句を追加することをお勧めします。

次のコード例は、これを削除に使用し、ターゲット テーブルをソース テーブルの内容で上書きし、ターゲット テーブル内の一致しないレコードを削除する基本的な構文を示しています。 ソースの更新と削除に時間制限があるテーブルのよりスケーラブルなパターンについては、「 Delta テーブルをソースと増分同期する」を参照してください。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

次の例では、WHEN NOT MATCHED BY SOURCE句に条件を追加し、一致しないターゲット行で更新する値を指定します。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

マージ操作のセマンティクス

以下は merge プログラム操作のセマンティクスの詳細である。

  • whenMatched句とwhenNotMatched句はいくつでも指定できます。

  • whenMatched 句は、一致条件に基づいてソース行がターゲットテーブルの行と一致する場合に実行されます。これらの句には、次のセマンティクスがあります。

    • whenMatched 句には、最大 1 つの update アクションと 1 つの delete アクションを含めることができます。 mergeupdate アクションは、一致したターゲット行の指定された列 (update 操作と同様) のみを更新します。delete アクションは、一致した行を削除します。

    • whenMatched句には、省略可能な条件を設定できます。この句条件が存在する場合、句条件がtrueの場合にのみ、一致するソース行とターゲット行のペアに対してupdateまたはdeleteアクションが実行されます。

    • 複数のwhenMatched句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenMatched句には条件が必要です。

    • マージ条件に一致するソース行とターゲット行のペアについて、どのwhenMatched条件もtrueと評価されない場合、ターゲット行は変更されないままになります。

    • ターゲットデルタテーブルのすべての列をソースデータセットの対応する列で更新するには、whenMatched(...).updateAll()を使用します。これは次と同等です:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      ターゲットDeltaテーブルのすべての列に対して。したがって、このアクションでは、ソーステーブルにターゲットテーブルの列と同じ列があることが前提となっています。そうでない場合、クエリは分析エラーをスローします。

      この動作は、自動スキーマ移行が有効になっている場合に変わります。 詳細については、 自動スキーマ進化 を参照してください。

  • whenNotMatched 句は、一致条件に基づいてソース行がターゲット行と一致しない場合に実行されます。これらの句には、次のセマンティクスがあります。

    • whenNotMatched 句には、insertアクションのみを含めることができます。新しい行は、指定された列と対応する式に基づいて生成されます。ターゲットテーブルのすべての列を指定する必要はありません。指定されていないターゲット列の場合は、NULLが挿入されます。

    • whenNotMatched句には、省略可能な条件を設定できます。句条件が存在する場合、その行に対してその条件がtrueの場合にのみソース行が挿入されます。それ以外の場合、ソース列は無視されます。

    • 複数のwhenNotMatched句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenNotMatched句には条件が必要です。

    • ターゲットデルタテーブルのすべての列を、ソースデータセットの対応する列とともに挿入するには、whenNotMatched(...).insertAll()を使用します。これは次と同等です:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      ターゲットDeltaテーブルのすべての列に対して。したがって、このアクションでは、ソーステーブルにターゲットテーブルの列と同じ列があることが前提となっています。そうでない場合、クエリは分析エラーをスローします。

      この動作は、自動スキーマ移行が有効になっている場合に変わります。 詳細については、 自動スキーマ進化 を参照してください。

  • whenNotMatchedBySource 句は、マージ条件に基づいてターゲット行がソース行と一致しない場合に実行されます。これらの句には、次のセマンティクスがあります。

    • whenNotMatchedBySource 節はdeleteupdateのアクションを指定できる。

    • whenNotMatchedBySource句には、省略可能な条件を設定できます。句条件が存在する場合、ターゲット行は、その行に対してその条件がtrueである場合にのみ変更されます。それ以外の場合、ターゲット行は変更されません。

    • 複数のwhenNotMatchedBySource句がある場合、それらは指定された順序で評価されます。最後の句を除くすべてのwhenNotMatchedBySource句には条件が必要です。

    • 定義上、whenNotMatchedBySource句には列値を取得するソース行がないため、ソース列を参照できません。変更する列ごとに、リテラルを指定するか、ターゲット列に対してアクション(SET target.deleted_count = target.deleted_count + 1など)を実行できます。

重要

  • ソースデータセットで複数の行が一致し、マージによってターゲット Delta テーブルの同じ行の更新が試行された場合、merge 操作が失敗する可能性があります。マージの SQL セマンティクスに従い、一致したターゲット行を更新するためにどのソース行を使用する必要があるかが不明瞭である場合、このような更新操作はあいまいとなります。ソーステーブルを前処理することで、複数一致が発生しないようにすることができます。

  • SQL VIEWにSQLMERGE操作を適用できるのは、ビューがCREATE VIEW viewName AS SELECT * FROM deltaTableとして定義されている場合のみです。

デルタテーブルへの書き込み時のデータ重複排除

一般的なETLの使用例は、ログをテーブルに追加してデルタテーブルにログを収集することです。ただし、多くの場合、ソースは重複したログレコードを生成する可能性があり、それらに対処するためにダウンストリームの重複排除手順が必要になります。mergeを使用すると、重複するレコードの挿入を回避できます。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

新しいログを含むデータセットは、それ自体内で重複排除する必要があります。マージのSQLセマンティクスにより、新しいデータをテーブル内の既存のデータと照合して重複を排除しますが、新しいデータセット内に重複データがある場合は、そのデータが挿入されます。したがって、テーブルにマージする前に、新しいデータの重複を排除してください。

重複レコードが数日間しか取得されないことがわかっている場合は、テーブルを日付でパーティション分割し、照合するターゲットテーブルの日付範囲を指定することで、クエリーをさらに最適化できます。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

これは、テーブル全体ではなく、過去7日間のログのみで重複を検索するため、前のコマンドよりも効率的です。さらに、この挿入専用マージを構造化ストリーミングと使用して、ログの継続的な重複排除を実行できます。

  • ストリーミング クエリでは、 foreachBatch でマージ操作を使用して、重複排除を使用してストリーミング データを Delta テーブルに継続的に書き込むことができます。 foreachBatchの詳細については、次の ストリーミングの例 を参照してください。

  • 別のストリーミングクエリーでは、この|Delatテーブルから重複排除されたデータを継続的に読み込むことができる。挿入のみのマージは、Deltaテーブルに新しいデータを追加するだけであるため、このようなことが可能である。

Delta Lake を使用した緩やかに変化するデータ (SCD) とチェンジデータキャプチャ (CDC)

Delta Live Tables は、SCD タイプ 1 とタイプ 2 の追跡と適用をネイティブにサポートしています。Delta Live Tables で APPLY CHANGES INTO を使用して、CDC フィードを処理するときに順不同のレコードが正しく処理されるようにします。 「Delta Live Tables の APPLY CHANGES API を使用した簡略化されたチェンジデータキャプチャ」を参照してください。

デルタテーブルをソースと増分同期する

Databricks SQLおよびDatabricks Runtime 12.1以降では、WHEN NOT MATCHED BY SOURCEを使用して任意の条件を作成し、テーブルの一部をアトミックに削除および置換できます。これは、最初のデータ入力から数日間レコードが変更または削除される可能性があるが、最終的には最終状態に落ち着くソーステーブルがある場合に特に便利です。

次のクエリは、このパターンを使用して、ソースから5日分のレコードを選択し、ターゲットの一致するレコードを更新し、ソースからターゲットに新しいレコードを挿入し、ターゲットの過去5日間の一致しないレコードをすべて削除することを示しています。

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

ソーステーブルとターゲットテーブルに同じブールフィルタを提供することにより、削除を含む変更をソーステーブルからターゲットテーブルに動的に伝播することができます。

このパターンは条件句なしで使用できますが、ターゲットテーブルを完全に書き換えることになり、コストがかかる可能性があります。