Delta Lake でデータを選択的に上書きする
DatabricksはDelta Lake機能を利用して、選択的上書きのための2つの異なるオプションをサポートします:
replaceWhere
オプションは、指定された述語に一致するすべてのレコードをアトミックに置き換えます。- 動的パーティションの上書きを使用して、テーブルのパーティション分割方法に基づいてデータのディレクトリを置き換えることができます。
ほとんどの操作では、DatabricksはreplaceWhere
を使用して上書きするデータを指定することを推奨します。
データが誤って上書きされた場合は、 復元 を使用して変更を元に戻すことができます。
任意かつ選択的な上書き: replaceWhere
任意の式に一致するデータのみを選択的に上書きできます。
SQL には Databricks Runtime 12.2 LTS 以降が必要です。
次のコマンドは、start_date
でパーティショニングされたターゲットテーブルの1月のイベントをreplace_data
のデータにアトミックに置き換える:
- Python
- Scala
- SQL
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
このサンプル コードでは、 replace_data
でデータを書き出し、すべての行が述語と一致することを検証し、 overwrite
セマンティクスを使用してアトミック置換を実行します。 操作内のいずれかの値が制約の範囲外にある場合、この操作はデフォルトでエラーで失敗します。
この動作を変更して、述部範囲内の値を overwrite
し、指定した範囲外のレコード insert
ことができます。 これを行うには、次のいずれかの設定を使用して spark.databricks.delta.replaceWhere.constraintCheck.enabled
を false に設定し、制約チェックを無効にします。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
従来の動作
従来のデフォルトの動作では replaceWhere
述語に一致するデータをパーティション列のみ上書きする必要がありました。 このレガシ モデルでは、次のコマンドは、 date
によってパーティション分割されたターゲット テーブルの 1 月 1 月を df
のデータにアトミックに置き換えます。
- Python
- Scala
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
以前の動作に戻す場合は、 spark.databricks.delta.replaceWhere.dataColumns.enabled
フラグを無効にすることができます。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
動的パーティションの上書き
動的パーティション上書きは、書き込みによって新しいデータがコミットされるパーティションのみが更新されます。これらのパーティション内の既存のデータはすべて上書きされ、他のデータは変更されません。
Databricks では、次の 2 つのアプローチがサポートされています。
REPLACE USING
(推奨) - Databricks SQLウェアハウス、サーバレス コンピュート、クラシックコンピュートなど、すべてのコンピュートタイプで動作します。Spark セッション構成を設定する必要はありません。partitionOverwriteMode
(レガシー) - クラシックコンピュートとSparkセッション構成の設定が必要です。Databricks SQL またはサーバレス コンピュートではサポートされていません。
以下のセクションでは、各アプローチの使用方法を示します。
動的パーティションの上書き REPLACE USING
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime 16.3 以降では、 REPLACE USING
を使用したパーティション分割テーブルの動的パーティション上書きがサポートされています。この方法では、 Spark セッション設定を設定することなく、すべてのコンピュートタイプでデータを選択的に上書きできます。 REPLACE USING
は、 Databricks SQL ウェアハウス、サーバレスコンピュート、およびクラシックコンピュートで機能するコンピュートに依存しないアトミック上書き動作を可能にします。
REPLACE USING
受信データの対象となるパーティションのみを上書きします。他のすべてのパーティションは変更されません。
次の例は、動的パーティション上書きを REPLACE USING
で使用する方法を示しています。現在、SQLのみを使用でき、PythonやScalaは使用できません。詳細については、SQL 言語リファレンスの INSERT を参照してください。
INSERT INTO TABLE events
REPLACE USING (event_id, start_date)
SELECT * FROM source_data
動的パーティションの上書きについては、次の制約と動作に留意してください。
USING
句でテーブルのパーティション列の完全なセットを指定する必要があります。- 書き込まれたデータが予期されるパーティションにのみ触れることを常に検証します。間違ったパーティションの 1 つの行が、意図せずにパーティション全体を上書きする可能性があります。
動的パーティションの上書き partitionOverwriteMode
(レガシ)
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime 11.3 LTS 以降では、上書きモード (SQL での INSERT OVERWRITE
、または df.write.mode("overwrite")
を使用した DataFrame 書き込み) を使用して、パーティション分割されたテーブルの動的パーティション上書きがサポートされています。このタイプの上書きは、クラシックコンピュートでのみ使用でき、 Databricks SQL ウェアハウスやサーバレスコンピュートでは使用できません。
Spark セッション設定 spark.sql.sources.partitionOverwriteMode
を dynamic
に設定して、動的パーティション上書きモードを設定します。または、 DataFrameWriter
オプション partitionOverwriteMode
を dynamic
に設定することもできます。クエリ固有のオプションが存在する場合、セッション構成で定義されているモードがオーバーライドされます。spark.sql.sources.partitionOverwriteMode
のデフォルトは static
です。
次の例は、 partitionOverwriteMode
の使用を示しています。
- SQL
- Python
- Scala
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
partitionOverwriteMode
には、次の制約と動作に留意してください。
overwriteSchema
をtrue
に設定することはできません。- 同じ
DataFrameWriter
操作でpartitionOverwriteMode
とreplaceWhere
の両方を指定することはできません。 DataFrameWriter
オプションを使用してreplaceWhere
条件を指定すると、Delta Lake はその条件を適用して、上書きするデータを制御します。このオプションは、partitionOverwriteMode
セッションレベルの設定よりも優先されます。- 書き込まれたデータが予期されるパーティションにのみ触れることを常に検証します。間違ったパーティションの 1 つの行が、意図せずにパーティション全体を上書きする可能性があります。