Table deletes, updates, and merges

Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables.

Delete from a table

You can remove data that matches a predicate from a Delta table. For instance, to delete all events from before 2017, you can run the following:

SQL

DELETE FROM events WHERE date < '2017-01-01'

DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

Scala

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

Python

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions

See Delta Lake API reference for more details.

Important

delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for more details.

Tip

When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can significantly speed up the operation.

Update a table

You can update data that matches a predicate in a Delta table. For example, to fix a spelling mistake in the eventType, you can run the following:

SQL

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

Scala

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));

Python

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions

See Delta Lake API reference for more details.

Tip

Similar to delete, update operations can get a significant speedup with predicates on partitions.

Upsert into a table using Merge

You can upsert data from an Apache Spark DataFrame into a Delta table using the merge operation. This operation is similar to the SQL MERGE command but has additional support for deletes and extra conditions in updates, inserts, and deletes.

Suppose you have a Spark DataFrame that contains new data for events with eventId. Some of these events may already be present in the events table. To merge the new data into the events table, you want to update the matching rows (that is, eventId already present) and insert the new rows (that is, eventId not present). You can run the following:

SQL

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

See the Merge Into (Delta Lake on Databricks) command for details.

Scala

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

Python

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()

Here is a detailed description of the merge programmatic operation.

  • There can be 1, 2, or 3 when[Matched | NotMatched] clauses. Of these, at most 2 can be whenMatched clauses, and at most 1 can be a whenNotMatched clause.
  • whenMatched clause:
    • Can have at most one update action and one delete action.
    • Can have an optional condition. However, if there are two clauses, then the first one must have a condition.
    • When there are two clauses and there are conditions (or the lack of) such that a row matches both clauses, the first clause/action is executed. In other words, the order of the clauses matters.
    • If none of the clauses matches a source-target row pair that satisfy the merge condition, the target rows are not updated.
    • To update all the columns of the target Delta table with the corresponding column of the source DataFrame, use whenMatched(...).updateAll(). This is equivalent to updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...)).
  • whenNotMatched clause:
    • Can have only the insert action, which can have an optional condition.
    • If not present or if it is present but the non-matching source row does not satisfy the condition, the source row is not inserted.
    • To insert all the columns of the target Delta table with the corresponding column of the source Dataframe, use whenMatched(...).insertAll(). This is equivalent to insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...)).
  • See the Delta Lake API reference for details.

Important

A merge operation can fail if multiple rows of the source dataset match and attempt to update the same same row of the target Delta table. By SQL semantics of merge, such an update operation is ambiguous as it is unclear which source should be used to update the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the change-data-capture example – it preprocesses the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table.

Tip

You should add as much information to the merge condition to both reduce the amount of work and the chance of transaction conflicts. For example, suppose you have a table that is partitioned by country and date and you want to use merge to update information for the last day country by country. Adding the condition events.date = current_date() AND events.country = 'USA' will make the query faster as it will look for matches only in the relevant partitions.

Merge examples

Here are a few examples on how to use merge in different scenarios.

Data deduplication when writing into Delta tables

A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge, you can avoid inserting the duplicate records.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Note

  • The dataset containing the new logs needs to be deduplicated within itself. By the SQL semantics of merge, it will match and deduplicated the new data with the existing data in the table, but if there is duplicate data within the new dataset, it will get inserted. Hence, deduplicate the new data before merging into the table.

  • If you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.

    SQL

    MERGE INTO logs
    USING newDedupedLogs
    ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
    WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
      THEN INSERT *
    

    Scala

    deltaTable.as("logs").merge(
        newDedupedLogs.as("newDedupedLogs"),
        "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
      .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
      .insertAll()
      .execute()
    

    Python

    deltaTable.alias("logs").merge(
        newDedupedLogs.alias("newDedupedLogs"),
        "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
      .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
      .execute()
    

    This will be more efficient than the previous command as it will looks for duplicates only in the last 7 days of logs, not the entire table.

  • You can use this insert-only merge with Structured Streaming to perform continuous deduplication of the logs.

    • In a streaming query, you can use merge operation in foreachBatch to continuously write any streaming data to a Delta table with deduplication. See the streaming example below for more information on foreachBatch.

    • In another streaming query, you can continuously read deduplicated data from this Delta table. This is possible because insert-only merge will only append new data to the Delta table.

      Note

      Insert-only merge is optimized to only append data in Databricks Runtime 6.2 and above. In earlier Databricks Runtime versions, writes from insert-only merge operations cannot be read as a streams.

Slowly changing data (SCD) Type 2 operation into Delta tables

Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with merge.

Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.

SCD Type 2 using merge notebook

Write change data into a Delta table

Similar to SCD, another common use case, often called change data capture (CDC), is to apply all data changes generated from an external database into a Delta table. In other words, a set of updates, deletes, and inserts applied to an external table needs to be applied to a Delta table. You can do this using merge as follows.

Writing change data using MERGE notebook

Upsert from streaming queries using foreachBatch

You can use a combination of merge and foreachBatch (see foreachbatch for more information) to write complex upserts from a streaming query into a Delta table. For example:

  • Write streaming aggregates in Update Mode: This is much more efficient than Complete Mode.
  • Write a stream of database changes into a Delta table: The merge query for writing change data can be used in foreachBatch to continuously apply a stream of changes to a Delta table.
  • Write a stream data into Delta table with deduplication: The insert-only merge query for deduplication can be used in foreachBatch to continuously write data (with duplicates) to a Delta table with automatic deduplication.

Note

  • SQL MERGE INTO in Python foreachBatch() is supported in Databricks Runtime 5.5 and above.

  • Make sure that your merge statement inside foreachBatch is idempotent as restarts of the streaming query can apply the operation on the same batch of data multiple times.
  • When merge is used in foreachBatch, the input data rate of the streaming query (reported through StreamingQueryProgress and visible in the notebook rate graph) may be reported as a multiple of the actual rate at which data is generated at the source. This is because merge reads the input data multiple times causing the input metrics to be multiplied. If this is a bottleneck, you can cache the batch DataFrame before merge and then uncache it after merge.

Writing streaming aggregates in update mode using merge and foreachBatch notebook