Selectively overwrite data with Delta Lake

Databricks leverages Delta Lake functionality to support two distinct options for selective overwrites:

  • The replaceWhere option atomically replaces all records that match a given predicate.

  • You can replace directories of data based on how tables are partitioned using dynamic partition overwrites.

Arbitrary selective overwrite with replaceWhere

You can selectively overwrite only the data that matches an arbitrary expression. This feature is available with DataFrames in Databricks Runtime 9.1 LTS and above and supported in SQL in Databricks Runtime 12.0 and above.

The following command atomically replaces events in January in the target table, which is partitioned by start_date, with the data in replace_data:

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)
replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
INSERT INTO TABLE events REPLACE WHERE start_data >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

This sample code writes out the data in replace_data, validates that it all matches the predicate, and performs an atomic replacement. If you want to write out data that doesn’t all match the predicate, to replace the matching rows in the target table, you can disable the constraint check by setting spark.databricks.delta.replaceWhere.constraintCheck.enabled to false:

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

In Databricks Runtime 9.0 and below, replaceWhere overwrites data matching a predicate over partition columns only. The following command atomically replaces the month January in the target table, which is partitioned by date, with the data in df:

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)
df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

In Databricks Runtime 9.1 and above, if you want to fall back to the old behavior, you can disable the spark.databricks.delta.replaceWhere.dataColumns.enabled flag:

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamic partition overwrites

Preview

This feature is in Public Preview.

Databricks Runtime 11.1 and above supports dynamic partition overwrite mode for partitioned tables. For tables with multiple partitions, Databricks Runtime 12.0 and below only support dynamic partition overwrites if all partition columns are of the same data type.

When in dynamic partition overwrite mode, we overwrite all existing data in each logical partition for which the write will commit new data. Any existing logical partitions for which the write does not contain data will remain unchanged. This mode is only applicable when data is being written in overwrite mode: either INSERT OVERWRITE in SQL, or a DataFrame write with df.write.mode("overwrite").

Configure dynamic partition overwrite mode by setting the Spark session configuration spark.sql.sources.partitionOverwriteMode to dynamic. You can also enable this by setting the DataFrameWriter option partitionOverwriteMode to dynamic. If present, the query-specific option overrides the mode defined in the session configuration. The default for partitionOverwriteMode is static.

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)
df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Note

Dynamic partition overwrite conflicts with the option replaceWhere for partitioned tables.

  • If dynamic partition overwrite is enabled in the Spark session configuration, and replaceWhere is provided as a DataFrameWriter option, then Delta Lake overwrites the data according to the replaceWhere expression (query-specific options override session configurations).

  • You’ll receive an error if the DataFrameWriter options have both dynamic partition overwrite and replaceWhere enabled.

Important

Validate that the data written with dynamic partition overwrite touches only the expected partitions. A single row in the incorrect partition can lead to unintentionally overwriting an entire partition. We recommend using replaceWhere to specify which data to overwrite.

If a partition has been accidentally overwritten, you can use restore to undo the change.