Delta Lake でデータを選択的に上書きする
Delta Lakeには、選択的上書きのための以下の明確なオプションがあります。
replaceWhereオプションは、指定された述語に一致するすべてのレコードをアトミックに置き換えます。- 動的パーティションの上書きを使用して、テーブルのパーティション分割方法に基づいてデータのディレクトリを置き換えることができます。
ほとんどの操作では、DatabricksはreplaceWhereを使用して上書きするデータを指定することを推奨します。
データが誤って上書きされた場合は、 復元 を使用して変更を元に戻すことができます。
任意かつ選択的な上書き: replaceWhere
任意の式に一致するデータのみを選択的に上書きできます。
SQL には Databricks Runtime 12.2 LTS 以降が必要です。
例えば、 start_dateでパーティション分割されたターゲットテーブルの1月のイベントを、 replace_dataのデータでアトミックに置き換えます。
- Python
- Scala
- SQL
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.saveAsTable("events")
)
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.saveAsTable("events")
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
このサンプル コードでは、 replace_dataでデータを書き出し、すべての行が述語と一致することを検証し、 overwrite セマンティクスを使用してアトミック置換を実行します。 操作内のいずれかの値が制約の範囲外にある場合、この操作はデフォルトでエラーで失敗します。
この動作を変更して、述部範囲内の値を overwrite し、指定した範囲外のレコード insert ことができます。 これを行うには、次のいずれかの設定を使用して spark.databricks.delta.replaceWhere.constraintCheck.enabled を false に設定し、制約チェックを無効にします。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
REPLACE WHERE いくつかの制限付きでboolean_expressionを受け入れます。SQL言語リファレンスのINSERTを参照してください。
従来の動作
replaceWhereの従来の動作を使用する場合、クエリはパーティション列に対してのみ述語に一致するデータを上書きします。以下のコマンドは、 dateでパーティション分割されたターゲットテーブルの1月を、 dfのデータでアトミックに置き換えます。
- Python
- Scala
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.saveAsTable("people10m")
)
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.saveAsTable("people10m")
従来の動作を使用するには、 spark.databricks.delta.replaceWhere.dataColumns.enabledフラグをfalseに設定してください。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
動的パーティションの上書き
動的パーティション上書きは、書き込みによって新しいデータがコミットされたパーティションのみを更新し、他のパーティションは変更しません。
Databricksは、動的パーティションの上書きをトリガーするためにREPLACE USINGを推奨しています。このモードは、 Databricks SQL 、サーバレス コンピュート、クラシック コンピュートを含むすべてのコンピュート タイプで動作し、 Sparkセッション構成を設定する必要はありません。 動的パーティションの上書きについては、 REPLACE USINGを参照してください。
partitionOverwriteMode これは動的パーティション上書きのレガシー モードであり、クラシック コンピュートを使用してSparkセッション構成を設定する必要があります。 Databricks SQLやサーバレス コンピュートではサポートされていません。 動的パーティションの上書きについては、 partitionOverwriteMode (レガシー) を参照してください。
以下のセクションでは、各モードの使用方法を説明します。
動的パーティションの上書き REPLACE USING
Databricks Runtime 16.3以降では、 REPLACE USINGを使用したテーブルの動的パーティション上書きがサポートされています。Sparkセッション構成を設定する必要なく、すべてのコンピュート タイプにわたってデータを選択的に上書きできます。 REPLACE USING Databricks SQL 、サーバレス コンピュート、およびクラシック コンピュートで動作する、コンピュートに依存しないアトミックな上書き動作を有効にします。
REPLACE USING 受信データの対象となるパーティションのみを上書きします。他のすべてのパーティションは変更されません。
REPLACE USING SQLでのみサポートされています。詳細については、SQL言語リファレンスのINSERTを参照してください。
以下の例ではREPLACE USINGを使用しています。
INSERT INTO TABLE events
REPLACE USING (event_id, start_date)
SELECT * FROM source_data
動的パーティションの上書きについては、次の制約と動作に留意してください。
USING句でテーブルのパーティション列の完全なセットを指定する必要があります。- 書き込まれたデータが予期されるパーティションにのみ触れることを常に検証します。間違ったパーティションの 1 つの行が、意図せずにパーティション全体を上書きする可能性があります。
NULL``REPLACE USING値を等しく扱うなど、がサポートするものよりもカスタマイズ可能なマッチング ロジックが必要な場合は、代わりに補完的なREPLACE ONを使用します。詳細については、 INSERT を参照してください。
動的パーティションの上書き partitionOverwriteMode (レガシ)
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime 11.3 LTS 以降では、上書きモード (SQL での INSERT OVERWRITE 、または df.write.mode("overwrite")を使用した DataFrame 書き込み) を使用して、パーティション分割されたテーブルの動的パーティション上書きがサポートされています。このタイプの上書きは、クラシックコンピュートでのみ使用でき、 Databricks SQL ウェアハウスやサーバレスコンピュートでは使用できません。
可能な場合は、パーティションの上書きINSERT OVERWRITE PARTITIONとspark.sql.sources.partitionOverwriteMode=dynamicの代わりにINSERT REPLACE USINGを使用してください。パーティションの上書きは、パーティションの変更時に古いデータを使用する可能性があります。
動的パーティション上書きモードを使用するには、Spark セッション構成spark.sql.sources.partitionOverwriteModeをdynamicに設定します。または、 DataFrameWriterオプションpartitionOverwriteModeをdynamicに設定することもできます。クエリ固有のオプションが存在する場合、そのオプションはセッション構成で定義されたモードを上書きします。spark.sql.sources.partitionOverwriteModeのデフォルト値はstaticです。
次の例ではpartitionOverwriteModeを使用しています。
- SQL
- Python
- Scala
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
partitionOverwriteModeには、次の制約と動作に留意してください。
overwriteSchemaをtrueに設定することはできません。- 同じ
DataFrameWriter操作でpartitionOverwriteModeとreplaceWhereの両方を指定することはできません。 DataFrameWriterオプションを使用してreplaceWhere条件を指定すると、Delta Lake はその条件を適用して、上書きするデータを制御します。このオプションは、partitionOverwriteModeセッションレベルの設定よりも優先されます。- 書き込まれたデータが予期されるパーティションにのみ触れることを常に検証します。間違ったパーティションの 1 つの行が、意図せずにパーティション全体を上書きする可能性があります。