Delta Lake でデータを選択的に上書きする
Delta Lakeには、選択的上書きのための以下の明確なオプションがあります。
オプション | ユースケース | サポートされているコンピュートのタイプ | 最小バージョン |
|---|---|---|---|
| 述語に一致する行をアトミックに上書きします。 | すべてのコンピュートタイプ。 | Databricks Runtime 12.2 LTS以降におけるSQL。Databricks Runtime 9.1 LTS以降におけるPythonおよびScala。 |
| 動的データの上書き。指定されたデータセット内で、列の値の等価性比較に基づいて、指定された列に一致するすべての行を置き換えます。 | すべてのコンピュートタイプ。 | Databricks Runtime 16.3以降におけるSQL。Databricks Runtime 18.2以降におけるPythonおよびScala。 |
| ブール式による動的データの上書き。 複雑なマッチング条件や NULL を許容するマッチング条件を持つ置換に使用します (例: | すべてのコンピュートタイプ。 | Databricks Runtime 17.1以降におけるSQL。Databricks Runtime 18.2以降におけるPythonおよびScala。 |
| 従来の動的パーティション上書きは、書き込みによって新しいデータがコミットされる各パーティション内の既存のすべてのデータを上書きします。新規ワークロードには推奨しません。 | SQLはクラシックなコンピュートのみをサポートします。 PythonとScalaすべてのコンピュート型をサポートします。 | Databricks Runtime 11.3 LTS以降におけるSQL、Python、およびScala。 |
ほとんどのユースケースでは、Databricks はREPLACE USINGまたはREPLACE WHERE使用を推奨しています。複雑なマッチング条件やNULL安全なマッチング条件が必要な場合にのみ、 REPLACE ON使用してください。
各オプションの置換動作の詳細については、 INSERTを参照してください。DataFrameWriter Delta Lakeオプションの完全なリストについては、 Delta LakeとApache Icebergを参照してください。
ScalaとPythonでは、 replaceOnとreplaceUsing replaceWhere 、 partitionOverwriteMode 、またはoverwriteSchemaと組み合わせて使用することはできません。
空のソースクエリの場合、 REPLACE USINGとREPLACE ONはどちらもデータを削除しませんが、 REPLACE WHEREデータを削除する可能性があります。
データが誤って上書きされた場合は、 復元 を使用して変更を元に戻すことができます。
REPLACE WHERE
任意の式に一致するデータのみを選択的にREPLACE WHEREで上書きできます。
ベータ版
REPLACE WHERE実行時に増分更新のメリットを得るには、 Spark宣言型パイプライン (SDP) で REPLACE WHERE フローを使用します。 REPLACE WHERE フローを使用したバッチ処理を参照してください。
start_dateでパーティション分割されたターゲットテーブルの1月のイベントを、 replace_dataのデータでアトミックに置き換える:
- Python
- Scala
- SQL
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.saveAsTable("events")
)
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.saveAsTable("events")
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
このサンプルコードは、 replace_dataデータを出力し、すべての行が述語に一致することを検証し、 overwriteセマンティクスを使用してアトミック置換を実行します。演算に含まれる値のいずれかが述語の範囲外である場合、この演算はデフォルトでエラーで失敗します。
従来のコンピュートでは、この動作を述語範囲内の値がoverwrite 、指定範囲外のレコードがinsertになるように変更するには、 spark.databricks.delta.replaceWhere.constraintCheck.enabledをfalseに設定して制約チェックを削除します。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
REPLACE WHERE いくつかの制限付きでboolean_expressionを受け入れます。SQL言語リファレンスのINSERTを参照してください。
空のソースクエリの場合、 REPLACE WHEREテーブルの行を削除する可能性があります。
従来の動作
レガシーreplaceWhereはクラシック コンピュートでのみ使用できます。 「クラシック コンピュートの概要」を参照してください。
replaceWhereの従来の動作を使用する場合、クエリはパーティション列に対してのみ述語に一致するデータを上書きします。以下のコマンドは、 dateでパーティション分割されたターゲットテーブルの1月を、 dfのデータでアトミックに置き換えます。
- Python
- Scala
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.saveAsTable("people10m")
)
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.saveAsTable("people10m")
従来の動作を使用するには、 spark.databricks.delta.replaceWhere.dataColumns.enabled falseに設定してください。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
動的データの上書き
動的データ上書き機能は、指定されたキー列またはブール式に一致するデータを選択的に置き換え、その他のデータは変更しません。 パーティション化されたテーブル、パーティション化されていないテーブル、およびリキッドクラスタリングを使用したテーブルはすべてサポートされています。
動的パーティション上書きは、動的データ上書き動作のサブセットです。動的パーティション上書きは、書き込みによって新しいデータがコミットされる各パーティション内の既存のデータをすべて置き換え、その他のパーティションは変更せずに残します。パーティションテーブルのみがサポートされています。
REPLACE USING
SQLはDatabricks Runtime 16.3以降でサポートされています。Databricks Runtime 18.2以降では、PythonとScalaがサポートされています。Databricks Runtime 16.3から17.1までの動作の違いについては、 「従来の動作」を参照してください。
REPLACE USING Databricks SQL 、サーバレス コンピュート、およびクラシック コンピュートで動作する、コンピュートに依存しないアトミックな上書き動作を有効にします。 REPLACE USING Spark セッション構成を設定する必要はありません。
REPLACE USING 等号検定において、指定された列が等しいと判断された場合に、行を置き換えます。その他のデータはすべて変更されていません。
動的データ上書きをREPLACE USINGで使用してください。
- Python
- Scala
- SQL
(sourceDataDF.write
.mode("overwrite")
.option("replaceUsing", "event_id, start_date")
.saveAsTable("events")
)
sourceDataDF.write
.mode("overwrite")
.option("replaceUsing", "event_id, start_date")
.saveAsTable("events")
INSERT INTO TABLE events
REPLACE USING (event_id, start_date)
SELECT * FROM source_data
空のソースクエリの場合、 REPLACE USINGテーブルの行を削除しません。
複雑なマッチング条件やNULL値に安全なマッチング条件の場合は、代わりにREPLACE ON使用してください。REPLACE ON参照。
SQL言語リファレンスのINSERTを参照してください。
従来の動作
Databricks Runtime 16.3~17.1では、 REPLACE USING従来の動作を使用し、動的なパーティションの上書きのみを許可しますが、Databricks Runtime 17.2以降では動的なデータの上書きが許可されます。
REPLACE USING従来の動作については、以下の制約と動作に留意してください。
USING句でテーブルのパーティション列の完全なセットを指定する必要があります。- 書き込まれたデータが予期されるパーティションにのみ触れることを常に検証します。間違ったパーティションの 1 つの行が、意図せずにパーティション全体を上書きする可能性があります。
REPLACE ON
SQLはDatabricks Runtime 17.1以降でサポートされています。Databricks Runtime 18.2以降では、PythonとScalaがサポートされています。
REPLACE ON は、ユーザー定義の条件に一致する行を置き換えます。これは、指定された列が等号の下で等しいと比較された場合に行を置き換えるREPLACE USINGとは異なります。REPLACE USINGではサポートされていないマッチングロジックが必要な場合(例えば、 NULL値を等しいとみなす場合など)は、 REPLACE ON使用してください。
必要に応じて、 targetAliasオプションを使用して対象テーブルのエイリアスを指定し、 .as()または.alias() APIs使用して送信データのエイリアスを指定します。
SQL構文については、 INSERTを参照してください。
- Python
- Scala
- SQL
(sourceDataDF.alias("s")
.write
.mode("overwrite")
.option("targetAlias", "t")
.option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
.saveAsTable("events")
)
sourceDataDF.as("s")
.write
.mode("overwrite")
.option("targetAlias", "t")
.option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
.saveAsTable("events")
INSERT INTO TABLE events AS t
REPLACE ON (s.event_id <=> t.event_id AND s.start_date <=> t.start_date)
(SELECT * FROM source_data) AS s
空のソースクエリの場合、 REPLACE ONテーブルの行を削除しません。
動的パーティションはpartitionOverwriteMode (レガシー)で上書きされます
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime 11.3 LTS 以降では、上書きモード (SQL での INSERT OVERWRITE 、または df.write.mode("overwrite")を使用した DataFrame 書き込み) を使用して、パーティション分割されたテーブルの動的パーティション上書きがサポートされています。このタイプの上書きは、クラシックコンピュートでのみ使用でき、 Databricks SQL ウェアハウスやサーバレスコンピュートでは使用できません。
可能な場合は、パーティションの上書きINSERT OVERWRITE PARTITIONとspark.sql.sources.partitionOverwriteMode=dynamicの代わりにINSERT REPLACE USINGを使用してください。パーティションの上書きは、パーティションの変更時に古いデータを使用する可能性があります。
動的パーティション上書きモードを使用するには、Spark セッション構成spark.sql.sources.partitionOverwriteModeをdynamicに設定します。または、 DataFrameWriterオプションpartitionOverwriteModeをdynamicに設定することもできます。クエリ固有のオプションが存在する場合、そのオプションはセッション構成で定義されたモードを上書きします。spark.sql.sources.partitionOverwriteModeのデフォルト値はstaticです。
次の例ではpartitionOverwriteModeを使用しています。
- SQL
- Python
- Scala
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
partitionOverwriteModeには、次の制約と動作に留意してください。
overwriteSchemaをtrueに設定することはできません。- 同じ
DataFrameWriter操作でpartitionOverwriteModeとreplaceWhereの両方を指定することはできません。 DataFrameWriterオプションを使用してreplaceWhere条件を指定すると、Delta Lake はその条件を適用して、上書きするデータを制御します。このオプションは、partitionOverwriteModeセッションレベルの設定よりも優先されます。- 書き込まれたデータが予期されるパーティションにのみ触れることを常に検証します。間違ったパーティションの 1 つの行が、意図せずにパーティション全体を上書きする可能性があります。