Skip to main content

Row-level concurrency

Row-level concurrency reduces conflicts between concurrent write operations by detecting changes at the row level and automatically resolving conflicts that occur when concurrent writes update or delete different rows in the same data file.

Requirements for row-level concurrency

Tables don't support row-level concurrency if they have partitions defined or don't have deletion vectors enabled. Row-level concurrency requires Databricks Runtime 14.2 and above.

Tables with partitions don't support row-level concurrency but can still avoid conflicts between OPTIMIZE and all other write operations when deletion vectors are enabled. See Limitations for row-level concurrency.

For Databricks Runtime versions before 14.2, see Row-level concurrency preview behavior (legacy).

note

MERGE INTO support for row-level concurrency requires Photon in Databricks Runtime 14.2. In Databricks Runtime 14.3 LTS and above, Photon is not required.

Conflict matrix with row-level concurrency

The following table shows which pairs of write operations can conflict in each isolation level with row-level concurrency enabled:

INSERT (1)

UPDATE, DELETE, MERGE INTO

OPTIMIZE

INSERT

Cannot conflict

UPDATE, DELETE, MERGE INTO

Cannot conflict in WriteSerializable. Can conflict in Serializable when modifying the same row.

Can conflict when modifying the same row.

OPTIMIZE

Cannot conflict

Can conflict when ZORDER BY is used. Cannot conflict otherwise.

Can conflict when ZORDER BY is used. Cannot conflict otherwise.

(1) All INSERT operations in this table describe append operations that do not read any data from the same table before committing. INSERT operations that contain subqueries reading the same table support the same concurrency as MERGE.

note
  • Tables with identity columns do not support concurrent transactions. See Use identity columns in Delta Lake.
  • REORG operations have isolation semantics identical to OPTIMIZE when rewriting data files. When you use REORG to apply an upgrade, table protocols change, which conflicts with all ongoing operations.

Write conflicts without row-level concurrency

Tables do not support row-level concurrency if they have partitions defined or do not have deletion vectors enabled. Databricks Runtime 14.2 or above is required for row-level concurrency.

Conflict matrix without row-level concurrency

The following table shows which pairs of write operations can conflict in each isolation level:

INSERT (1)

UPDATE, DELETE, MERGE INTO

OPTIMIZE

INSERT

Cannot conflict

UPDATE, DELETE, MERGE INTO

Cannot conflict in WriteSerializable. Can conflict in Serializable. See Avoid conflicts using partitioning.

Can conflict in Serializable and WriteSerializable. See Avoid conflicts using partitioning.

OPTIMIZE

Cannot conflict

Cannot conflict in tables with deletion vectors enabled, unless ZORDER BY is used. Can conflict otherwise.

Cannot conflict in tables with deletion vectors enabled, unless ZORDER BY is used. Can conflict otherwise.

(1) All INSERT operations in this table describe append operations that do not read any data from the same table before committing. INSERT operations that contain subqueries reading the same table support the same concurrency as MERGE.

note
  • Tables with identity columns do not support concurrent transactions. See Use identity columns in Delta Lake.
  • REORG operations have isolation semantics identical to OPTIMIZE when rewriting data files. When you use REORG to apply an upgrade, table protocols change, which conflicts with all ongoing operations.

Limitations for row-level concurrency

Limitations apply for row-level concurrency. For the following operations, conflict resolution follows normal concurrency for write conflicts. See Write conflicts without row-level concurrency.

Limitation

Description

Complex conditional clauses

Conditions on complex data types (structs, arrays, maps), non-deterministic expressions, subqueries, and correlated subqueries

MERGE predicate requirement

In Databricks Runtime 14.2, MERGE commands must use an explicit predicate on the target table to filter rows matching the source table

Performance tradeoff

Row-level conflict detection can increase total execution time. With many concurrent transactions, the writer prioritizes latency over conflict resolution

All limitations for deletion vectors also apply. See Limitations.

Avoid conflicts using partitioning

For all cases marked "can conflict" in the conflict matrices, a conflict occurs only if the two operations affect the same set of files. To make two sets of files disjoint, partition the table by the same columns used in operation conditions.

Example:

The commands UPDATE table WHERE date > '2010-01-01' ... and DELETE table WHERE date < '2010-01-01' conflict if the table is not partitioned by date, because both can attempt to modify the same files. Partitioning the table by date avoids the conflict.

note

Partitioning a table by a column with high cardinality can lead to performance issues due to the large number of subdirectories.

Example: Avoiding conflicts with explicit partition filters

This exception is often thrown during concurrent DELETE, UPDATE, or MERGE operations that might read the same partition even when updating different partitions. Make the separation explicit in the operation condition:

Scala
// Problem: Condition can scan the entire table
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

// Solution: Add explicit partition filters
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + date + "' AND t.country = '" + country + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

Conflict exceptions

When a transaction conflict occurs, you observe one of the following exceptions:

ConcurrentAppendException

This exception occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. The file additions can be caused by INSERT, DELETE, UPDATE, or MERGE operations.

With the default WriteSerializable isolation level, files added by blind INSERT operations (operations that append data without reading any data) do not conflict with any operation. If the isolation level is Serializable, blind appends may conflict.

important

Blind appends can conflict in WriteSerializable mode if multiple concurrent DELETE, UPDATE, or MERGE operations might reference values inserted by blind appends. To avoid this:

  • Ensure concurrent DELETE, UPDATE, or MERGE operations do not read the appended data
  • Have at most one DELETE, UPDATE, or MERGE operation that can read the appended data

ConcurrentDeleteReadException

This exception occurs when a concurrent operation deletes a file that your operation read. Common causes are DELETE, UPDATE, or MERGE operations that rewrite files.

ConcurrentDeleteDeleteException

This exception occurs when a concurrent operation deletes a file that your operation also deletes. This could be caused by two concurrent compaction operations rewriting the same files.

MetadataChangedException

This exception occurs when a concurrent transaction updates the metadata of a Delta table. Common causes are ALTER TABLE operations or writes that update the table schema.

ConcurrentTransactionException

This exception occurs if a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. Never run two streaming queries with the same checkpoint location concurrently.

ProtocolChangedException

This exception can occur when:

  • Your Delta table is upgraded to a new protocol version (you might need to upgrade your Databricks Runtime)
  • Multiple writers are creating or replacing a table at the same time
  • Multiple writers are writing to an empty path at the same time

See Delta Lake feature compatibility and protocols.

Row-level concurrency preview behavior (legacy)

This section describes preview behaviors for row-level concurrency in Databricks Runtime 14.1 and below.

Databricks Runtime Version

Behavior

Databricks Runtime 13.3 LTS and above

Tables with liquid clustering automatically enable row-level concurrency

Databricks Runtime 14.0 and 14.1

Enable row-level concurrency for tables with deletion vectors using the configuration below

Databricks Runtime 14.1 and below

Non-Photon compute only supports row-level concurrency for DELETE operations

To enable row-level concurrency in Databricks Runtime 14.0 and 14.1:

ini
spark.databricks.delta.rowLevelConcurrencyPreview = true

Row-level concurrency always requires deletion vectors.

Next steps