The isolation level of a table defines the degree to which a transaction must be isolated from modifications made by concurrent transactions. Delta Lake on Databricks supports two isolation levels: Serializable and WriteSerializable.
Serializable: The strongest isolation level. It ensures that committed write operations and all reads are Serializable. Operations are allowed as long as there exists a serial sequence of executing them one-at-a-time that generates the same outcome as that seen in the table. For the write operations, the serial sequence is exactly the same as that seen in the table’s history.
WriteSerializable (Default): A weaker isolation level than Serializable. It ensures only that the write operations (that is, not reads) are serializable. However, this is still stronger than Snapshot isolation. WriteSerializable is the default isolation level because it provides great balance of data consistency and availability for most common operations.
In this mode, the content of the Delta table may be different from that which is expected from the sequence of operations seen in the table history. This is because this mode allows certain pairs of concurrent writes (say, operations X and Y) to proceed such that the result would be as if Y was performed before X (that is, serializable between them) even though the history would show that Y was committed after X. To disallow this reordering, set the table isolation level to be Serializable to cause these transactions to fail.
You set the isolation level using the
ALTER TABLE command.
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
For example, to change the isolation level from the default
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
The following table describes which pairs of write operations can conflict in each isolation level.
|INSERT||UPDATE, DELETE, MERGE INTO||OPTIMIZE|
|UPDATE, DELETE, MERGE INTO||Can conflict in Serializable, cannot conflict in WriteSerializable||Can conflict in Serializable & WriteSerializable|
|OPTIMIZE||Cannot conflict||Can conflict in Serializable & WriteSerializable||Can conflict in Serializable & WriteSerializable|
In all cases marked “can conflict”, whether the two operations will conflict depends on whether they operate on the same set of files. You can make the two sets of files disjoint by partitioning the table by the same columns as those used in the conditions of the operations. For example, the two commands
UPDATE table WHERE date > '2010-01-01' ... and
DELETE table WHERE date < '2010-01-01' will conflict if the table is not partitioned by date, as both can attempt to modify the same set of files. Partitioning the table by
date will avoid the conflict. Hence, partitioning a table according to the conditions commonly used on the command can reduce conflicts significantly. However, partitioning a table by a column that has high cardinality can lead to other performance issues due to large number of subdirectories.
When a transaction conflict occurs, you will observe one of the following exceptions.
When a transaction updates records in a partition and a concurrent transaction also updates records in the same partition. When the isolation level is set to
Serializable, any append to a partition directory that your transaction reads would cause this error. When the isolation level is set to
WriteSerializable, then your transaction will conflict only if there was an update (files were rewritten due to
MERGE) to a partition your operation was reading.
This exception is typically thrown during concurrent
MERGEoperations. While the concurrent operations may be physically working on separate partition directories, the operations may conflict unless this separation is explicit in the
MERGEcondition. Consider the following example.
date = dbutils.widgets.get('date') country = dbutils.widgets.get('country') # Read the json data for a given country at a given date spark.read.json('/source/path/date=%s/country=%s' % (date, country)).createOrReplaceTempView('source_table') # Target 'delta_table' is also partitioned by date and country spark.sql(""" MERGE INTO delta_table d USING source_table s ON s.user_id = d.user_id AND s.date = d.date AND s.country = d.country WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)
Suppose you run the above Python notebook as a Databricks job, and you schedule different dates or countries concurrently. Since each job is working on an independent partition on the target Delta table, you don’t expect any conflicts. However the condition is unfortunately not explicit enough, and may result in a conflict. Instead, you can rewrite your statement as:
# Note how the date and country are filled in for the target table. spark.sql(""" MERGE INTO delta_table d USING source_table s ON s.user_id = d.user_id AND d.date = '%s' AND d.country = '%s' WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """ % (date, country))
- A concurrent transaction deletes a file that your transaction read. Common causes are a
MERGEoperation that rewrites files.
- A concurrent transaction deletes a file that your transaction also deletes. This could be caused by two concurrent
OPTIMIZEjobs optimizing the same files.
- A concurrent transaction updates the metadata of a Delta table. Common causes are
ALTER TABLEoperations or writes to your Delta table that update the schema of the table.
- If a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. You should never have two streaming queries use the same checkpoint location and run at the same time.
- When your Delta table is upgraded to a new version. You may need to upgrade your cluster version for future transactions to succeed. See table versions for more details.