Enable idempotent writes across jobs
Sometimes a job that writes data to a Delta table is restarted due to various reasons (for example, job encounters a failure). The failed job may or may not have written the data to Delta table before terminating. In the case where the data is written to the Delta table, the restarted job writes the same data to the Delta table which results in duplicate data.
To address this, Delta tables support the following
DataFrameWriter options to make the writes idempotent:
txnAppId: A unique string that you can pass on each
DataFramewrite. For example, this can be the name of the job.
txnVersion: A monotonically increasing number that acts as transaction version. This number needs to be unique for data that is being written to the Delta table(s). For example, this can be the epoch seconds of the instant when the query is attempted for the first time. Any subsequent restarts of the same job needs to have the same value for
The above combination of options needs to be unique for each new data that is being ingested into the Delta table and the
txnVersion needs to be higher than the last data that was ingested into the Delta table. For example:
Last successfully written data contains option values as
Next write of data should have
txnAppId = dailyETLand
txnVersionas at least
23424(one more than the last written data
Any attempt to write data with
txnAppId = dailyETLand
23422or less is ignored because the
txnVersionis less than the last recorded
txnVersionin the table.
Attempt to write data with
anotherETL:23424is successful writing data to the table as it contains a different
txnAppIdcompared to the same option value in last ingested data.
This solution assumes that the data being written to Delta table(s) in multiple retries of the job is the same. If a write attempt to a Delta table succeeds but due to some downstream failure there is a second write attempt with same txn options but different data, then that second write attempt will be ignored. This can cause unexpected results.
See the following code for an example:
app_id = ... # A unique string that is used as an application ID. version = ... # A monotonically increasing number that acts as transaction version. dataFrame.write.option("txnVersion", version).option("txnAppId", app_id).save(...)
val appId = ... // A unique string that is used as an application ID. version = ... // A monotonically increasing number that acts as transaction version. dataFrame.write.option("txnVersion", version).option("txnAppId", appId).save(...)