Databricks での分離レベルと書き込みの競合
テーブルの独立性レベルは、並列操作によって行われた変更からトランザクションを分離する度合いを定義します。 Databricks での書き込みの競合は、分離レベルによって異なります。
Delta Lake では ACID読み取りと書き込みの間のトランザクションが保証されます。 これは、次のことを意味します。
- 複数のクラスターにまたがる複数のライターは、テーブルパーティションを同時に変更できます。 ライターには、テーブルの一貫性のあるスナップショット・ビューが表示され、書き込みはシリアル・オーダーで行われます。
- 閲覧者には、ジョブ中にテーブルが変更された場合でも、Databricks ジョブが開始されたテーブルの一貫したスナップショット ビューが引き続き表示されます。
「Databricks の ACID 保証とは」を参照してください。
Databricks 、デフォルトですべてのテーブルにデルタレイクを使用します。 この記事では、Databricks での Delta Lake の動作について説明します。
メタデータを変更すると、すべての並列書き込み操作が失敗します。 これらの操作には、テーブルプロトコル、テーブルプロパティ、またはデータスキーマの変更が含まれます。
ストリーミング読み取りは、テーブルのメタデータを変更するコミットを検出すると失敗します。 ストリームを続行する場合は、ストリームを再起動する必要があります。 推奨される方法については、 構造化ストリーミングの本番運用に関する考慮事項を参照してください。
メタデータを変更するクエリの例を次に示します。
-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);
-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;
-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));
-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);
行レベルの同時実行性との書き込みの競合
行レベルの同時実行性は、行レベルで変更を検出し、並列書き込みが同じデータ ファイル内の異なる行を更新または削除するときに発生する競合を自動的に解決することで、並列書き込み操作間の競合を減らします。
行レベルのコンカレンシーは、Databricks Runtime 14.2 以降で一般公開されています。 行レベルの同時実行性は、次の条件でデフォルトによってサポートされます。
- 削除ベクトルが有効でパーティション分割されていないテーブル。
- リキッドクラスタリングを持つテーブル (削除ベクトルを無効にしている場合を除く)。
パーティションを持つテーブルは、行レベルの同時実行性をサポートしていませんが、削除ベクトルが有効になっている場合は、 OPTIMIZE
と他のすべての書き込み操作との間の競合を回避できます。 行レベルの同時実行性の制限を参照してください。
その他の Databricks Runtime バージョンについては、「 行レベルのコンカレンシー プレビュー動作 (レガシ)」を参照してください。
MERGE INTO
行レベルの同時実行性のサポートには、Databricks Runtime 14.2 の Photon が必要です。 Databricks Runtime 14.3 LTS 以降では、Photon は必要ありません。
次の表は、ロー・レベルの同時実行性が有効になっている各 独立性レベルで 競合する可能性のある書き込み操作のペアを示しています。
ID カラムを持つテーブルは、並列トランザクションをサポートしません。 「Delta Lake での ID 列の使用」を参照してください。
インサート (1) | UPDATE、削除、 MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | 競合することはできません | ||
UPDATE、削除、 MERGE INTO | WriteSerializable で競合することはできません。 同じ行を変更するときに Serializable で競合する可能性があります。 行レベルの同時実行性の制限を参照してください。 | 同じ行を変更すると競合する可能性があります。 行レベルの同時実行性の制限を参照してください。 | |
OPTIMIZE | 競合することはできません |
|
|
(1) 上記の表のすべての INSERT
操作は、コミットする前に同じテーブルからデータを読み取らない追加操作を示しています。 同じテーブルを読み取るサブクエリを含む INSERT
操作は、MERGE
と同じ同時実行性をサポートします。
REORG
操作の分離セマンティクスは、削除ベクトルに記録された変更を反映するようにデータ ファイルを書き換える場合の OPTIMIZE
と同じです。 REORG
を使用してアップグレードを適用すると、テーブルプロトコルが変更され、進行中のすべてのオペレーションと競合します。
行レベルの同時実行なしで競合を書き込んでください
次の表は、各 独立性レベルで競合する可能性のある書き込み操作のペアを示しています。
テーブルでパーティションが定義されている場合、または削除ベクトルが有効になっていない場合、行レベルの同時実行性はサポートされません。 Databricks Runtime 14.2 以降は、行レベルのコンカレンシーに必要です。
ID カラムを持つテーブルは、並列トランザクションをサポートしません。 「Delta Lake での ID 列の使用」を参照してください。
インサート (1) | UPDATE、削除、 MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | 競合することはできません | ||
UPDATE、削除、 MERGE INTO | WriteSerializable で競合することはできません。 シリアル化可能では競合する可能性があります。 パーティションとの競合の回避を参照してください。 | Serializable と WriteSerializable で競合する可能性があります。 パーティションとの競合の回避を参照してください。 | |
OPTIMIZE | 競合することはできません | 削除ベクトルが有効になっているテーブルと競合することはできません ( | 削除ベクトルが有効になっているテーブルと競合することはできません ( |
(1) 上記の表のすべての INSERT
操作は、コミットする前に同じテーブルからデータを読み取らない追加操作を示しています。 同じテーブルを読み取るサブクエリを含む INSERT
操作は、MERGE
と同じ同時実行性をサポートします。
REORG
操作の分離セマンティクスは、削除ベクトルに記録された変更を反映するようにデータ ファイルを書き換える場合の OPTIMIZE
と同じです。 REORG
を使用してアップグレードを適用すると、テーブルプロトコルが変更され、進行中のすべてのオペレーションと競合します。
行レベルの同時実行の制限事項
行レベルの同時実行性には、いくつかの制限が適用されます。 次の操作では、競合の解決は、Databricks での書き込み競合の通常のコンカレンシーに従います。 行レベルの同時実行性を使用しない書き込み競合を参照してください。
-
次のような複雑な条件句を持つコマンド。
- 構造体、配列、マップなどの複雑なデータ型の条件。
- 非決定論的な式とサブクエリを使用する条件。
- 相関サブクエリを含む条件。
-
Databricks Runtime 14.2 では、
MERGE
コマンドでターゲット テーブルで明示的な述語を使用して、ソース テーブルに一致する行をフィルター処理する必要があります。 マージ解決の場合、フィルターは並列操作のフィルター条件に基づいて競合する可能性のある行のみをスキャンします。
行レベルの競合検出により、合計実行時間が長くなる可能性があります。 多数の並列トランザクションの場合、ライターは競合の解決よりも待機時間を優先し、競合が発生する可能性があります。
削除ベクトルに関するすべての制限も適用されます。 制限事項を参照してください。
Delta Lake がテーブルを読み取らずにコミットするのはいつですか?
Delta Lake INSERT
または追加操作では、次の条件が満たされている場合、コミット前にテーブルの状態が読み取られません。
- ロジックは
INSERT
SQLロジックまたは追加モードを使用して表現されます。 - ロジックには、書き込み操作の対象となるテーブルを参照するサブクエリや条件は含まれていません。
他のコミットと同様に、 Delta Lake はトランザクション ログのメタデータを使用してコミット時にテーブルのバージョンを検証して解決しますが、テーブルのバージョンは実際には読み取られません。
多くの一般的なパターンでは、 MERGE
操作を使用して、テーブルの条件に基づいてデータを挿入します。 INSERT
ステートメントを使用してこのロジックを書き換えることも可能ですが、条件式がターゲット テーブルの列を参照する場合、これらのステートメントには MERGE
と同じ同時実行性の制限があります。
直列化可能分離レベルと直列化可能分離レベルの書き込み
テーブルの独立性レベルは、並列トランザクションによって行われた変更からトランザクションを分離する度合いを定義します。 Databricks 上の Delta Lake では、Serializable と WriteSerializable の 2 つの分離レベルがサポートされています。
-
Serializable : 最も強力な分離レベル。 これにより、コミットされた書き込み操作とすべての読み取りが シリアル化可能になります。 操作は、テーブルに表示されるものと同じ結果を生成する一度に 1 つずつ実行するシリアル シーケンスが存在する限り、許可されます。 書き込み操作の場合、シリアルシーケンスはテーブルの履歴に表示されるものとまったく同じです。
-
WriteSerializable (デフォルト): Serializable よりも弱い分離レベル。 書き込み操作 (つまり、読み取りではない) がシリアル化可能であることのみが保証されます。 ただし、これは スナップショット ・アイソレーションよりも強力です。 WriteSerializable は、最も一般的な操作でデータの一貫性と可用性のバランスが優れているため、デフォルトの独立性レベルです。
このモードでは、Delta テーブルの内容は、テーブル履歴に表示される一連の操作から予想される内容と異なる場合があります。 これは、このモードでは、特定の並列書き込みのペア (たとえば、操作 X と Y) を続行できるため、履歴では Y が X の後にコミットされたことが履歴で示されていても、結果は Y が X の前に実行された (つまり、それらの間でシリアル化可能) ことになるためです。 この並べ替えを禁止するには、 テーブルの独立性レベル を Serializable に設定して、これらのトランザクションが失敗するようにします。
読み取り操作では、常にスナップショット・アイソレーションが使用されます。 書き込み独立性レベルは、履歴によれば「存在しなかった」テーブルのスナップショットをリーダーが表示できるかどうかを決定します。
Serializable レベルでは、リーダーには常に履歴に準拠するテーブルのみが表示されます。 WriteSerializable レベルでは、リーダーは Delta ログに存在しないテーブルを参照できます。
たとえば、実行時間の長い削除である txn1 と、txn1 によって削除されたデータを挿入する txn2 について考えてみます。 txn2 と txn1 が完了し、履歴にその順序で記録されます。 履歴によると、txn2に挿入されたデータはテーブルに存在してはなりません。 Serializable レベルの場合、txn2 によって挿入されたデータをリーダーが見ることはありません。 ただし、WriteSerializable レベルでは、ある時点で txn2 によって挿入されたデータをリーダーが認識できます。
各独立性レベルで互いに競合する可能性のある操作のタイプと、発生する可能性のあるエラーの詳細については、 パーティション分割と不整合なコマンド条件を使用して競合を回避するを参照してください。
独立性レベルを設定する
独立性レベルは、 ALTER TABLE
コマンドを使用して設定します。
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
ここで、 <level-name>
は Serializable
または WriteSerializable
です。
たとえば、独立性レベルをデフォルトの WriteSerializable
から Serializable
に変更するには、次のコマンドを実行します。
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
パーティション分割と不整合なコマンド条件を使用して競合を回避
「競合する可能性があります」とマークされたすべての場合において、2 つの操作が競合するかどうかは、同じファイル セットを操作するかどうかによって異なります。 2 つのファイル・セットを切り離すには、操作の条件で使用される列と同じ列でテーブルをパーティション分割します。 たとえば、テーブルが日付でパーティション化されていない場合、 UPDATE table WHERE date > '2010-01-01' ...
と DELETE table WHERE date < '2010-01-01'
の 2 つのコマンドは競合します。これは、どちらも同じファイル セットの変更を試みる可能性があるためです。 テーブルを date
でパーティション分割すると、競合を回避できます。 したがって、コマンドで一般的に使用される条件に従ってテーブルをパーティション分割すると、競合を大幅に減らすことができます。 ただし、カーディナリティの高い列でテーブルをパーティション分割すると、サブディレクトリの数が多いため、他のパフォーマンスの問題が発生する可能性があります。
競合の例外
トランザクションの競合が発生すると、次のいずれかの例外が発生します。
ConcurrentAppendException (同時追加例外)
この例外は、並列操作で、操作が読み取るのと同じパーティション (またはパーティション分割されていないテーブル内の任意の場所) にファイルを追加する場合に発生します。 ファイルの追加は、 INSERT
、 DELETE
、 UPDATE
、または MERGE
操作によって発生する可能性があります。
デフォルトの独立性レベルが WriteSerializable
の場合、 ブラインド INSERT
操作 (つまり、データを読み取らずにデータを追加する操作) によって追加されたファイルは、同じパーティション (またはパーティション化されていないテーブル内の任意の場所) に触れていても、どの操作とも競合しません。独立性レベルが Serializable
に設定されている場合、ブラインド追加が競合する可能性があります。
重要 : WriteSerializable
モードでは、 DELETE
、 UPDATE
、または MERGE
操作を実行している複数の並列トランザクションが、ブラインド追加によって挿入された値を参照する可能性がある場合、ブラインド追加が競合する可能性があります。 この競合を回避するには、次のいずれかの操作を行います。
- 並列
DELETE
、UPDATE
、またはMERGE
操作が、追加されたデータを読み取らないようにしてください。 - 追加されたデータを読み取ることができる
DELETE
、UPDATE
、またはMERGE
操作は、最大で 1 つまでです。
この例外は、並列 DELETE
、 UPDATE
、または MERGE
操作中にスローされることがよくあります。 並列操作は物理的に異なるパーティションディレクトリを更新している可能性がありますが、そのうちの1つが他のパーティションを同時に更新するのと同じパーティションを読み取る可能性があるため、競合が発生します。 これを回避するには、操作条件で分離を明示的にします。 次の例を考えてみます。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
上記のコードを異なる日付または国で同時に実行するとします。 各ジョブはターゲット Delta テーブル上の独立したパーティションで動作しているため、競合は予想されません。 ただし、この条件は十分に明示されておらず、テーブル全体をスキャンでき、他のパーティションを更新する並列操作と競合する可能性があります。 代わりに、次の例に示すように、ステートメントを書き換えて、特定の日付と国をマージ条件に追加できます。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
このオペレーションは、異なる日付や国で同時に安全に実行できるようになりました。
ConcurrentDeleteReadException (コンカレントデリート読み取り例外)
この例外は、操作が読み取ったファイルを並列操作によって削除された場合に発生します。 一般的な原因は、ファイルを書き換える DELETE
、 UPDATE
、または MERGE
操作です。
同時削除/削除例外
この例外は、並列操作によって削除されたファイルのうち、その操作によっても削除される場合に発生します。 これは、2 つの並列圧縮操作が同じファイルを再書き込みすることによって発生する可能性があります。
メタデータ変更例外
この例外は、並列トランザクションが Delta テーブルのメタデータを更新するときに発生します。 一般的な原因は、テーブルのスキーマを更新する Delta テーブルへの ALTER TABLE
操作または書き込みです。
同時トランザクション例外
同じチェックポイントの場所を使用するストリーミング クエリが同時に複数回開始され、同時に Delta テーブルへの書き込みを試みた場合。 2 つのストリーミング クエリで同じチェックポイントの場所を使用し、同時に実行しないでください。
プロトコルが変更されました。例外
この例外は、次の場合に発生する可能性があります。
- Delta テーブルが新しいプロトコル バージョンにアップグレードされたとき。 今後の運用を成功させるには、 Databricks Runtimeをアップグレードする必要がある場合があります。
- 複数のライターが同時にテーブルを作成または置換している場合。
- 複数のライターが同時に空のパスに書き込んでいる場合。
詳細については、「 Databricks で Delta Lake 機能の互換性を管理する方法」 を参照してください。
行レベルのコンカレンシー プレビュー動作 (レガシ)
このセクションでは、Databricks Runtime 14.1 以下での行レベルのコンカレンシーのプレビュー動作について説明します。 行レベルの同時実行性には、常に削除ベクトルが必要です。
Databricks Runtime 13.3 LTS 以降では、リキッドクラスタリングが有効になっているテーブルで、行レベルの同時実行性が自動的に有効になります。
Databricks Runtime 14.0 および 14.1 では、クラスタ化またはSparkSessionに次の設定を設定することで、削除ベクトルを持つテーブルの行レベルの同時実行を有効にできます。
spark.databricks.delta.rowLevelConcurrencyPreview = true
Databricks Runtime 14.1以下では、Photonコンピュート以外のコンピュートは、DELETE
操作の行レベルの同時実行のみをサポートします。