テーブルスキーマの更新
テーブルはスキーマ進化をサポートしており、データ要件の変化に合わせてテーブル構造の変更ができるようになります。次の変更の種類がサポートされています。
- 新しい列の追加(任意の位置)
- 既存の列の並べ替え
- 既存の列の名前を変更する
- 既存の列の型の拡張、自動スキーマ進化による型の拡張を参照してください。
これらの変更は、DDLを使用して明示的に行うことも、DMLを使用して暗黙的に行うこともできます。
スキーマの更新は、すべての並列書き込み操作と競合します。Databricks は、書き込みの競合を回避するためにスキーマの変更を調整することをお勧めします。
テーブルスキーマを更新すると、そのテーブルから読み取るストリームが終了します。処理を続行するには、「構造化ストリーミングの本番運用の考慮事項」に記載されている方法を使用してストリームを再開してください。
スキーマを明示的に更新して列を追加する
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
デフォルトでは、NULL値の許容はtrueです。
ネストされたフィールドに列を追加するには、次のコマンドを使用します:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)を実行する前のスキーマが次のようになっているとします:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のとおりです:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
ネストされた列の追加は、構造体に対してのみサポートされています。配列とマップはサポートされていません。
スキーマを明示的に更新して列のコメントまたは順序を変更する
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
ネストされたフィールドの列を変更するには、次のコマンドを使用します。
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRSTを実行する前のスキーマが次のようになっているとします:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のとおりです:
- root
| - colA
| - colB
| +-field2
| +-field1
スキーマを明示的に更新して列を置き換える
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
例えば、以下のようなDDLを実行する場合:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のとおりです:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
スキーマを明示的に更新して列の名前を変更する
この機能は Databricks Runtime 10.4 LTS 以降でご利用いただけます。
列の既存のデータを書き換えずに列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。
列の名前を変更するには:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
ネストされたフィールドの名前を変更するには:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
たとえば、次のコマンドを実行するとします:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
以前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のようになります:
- root
| - colA
| - colB
| +-field001
| +-field2
Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。
スキーマを明示的に更新して列を削除する
この機能はDatabricks Runtime 11.3 LTS以降でご利用いただけます。
データファイルを書き換えることなく、メタデータのみの操作として列をドロップするには、テーブルの列マッピングを有効にする必要があります。 Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。
メタデータから列を削除しても、ファイル内の列の基礎となるデータは削除されません。削除された列データを消去するには、REORG TABLEを使用してファイルを書き換えます。その後、VACUUMを使用して、ドロップされた列データを含むファイルを物理的に削除することができます。
列を削除するには:
ALTER TABLE table_name DROP COLUMN col_name
複数の列を削除するには:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
スキーマを明示的に更新して列の型や名前を変更する
テーブルを書き換えることで、列のタイプや名前を変更したり、列を削除したりできます。これを行うには、overwriteSchemaオプションを使用します。
次の例は、列の型を変更する方法を示しています:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
次の例は、列名の変更を示しています:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
スキーマ進化を有効化
次のいずれかの方法を使用して、スキーマ進化を有効にします:
INSERT WITH SCHEMA EVOLUTION構文を使用:INSERTステートメントに対応します。SQL構文にWITH SCHEMA EVOLUTIONを含めてください。MERGE WITH SCHEMA EVOLUTION構文を使用:MERGEステートメントに対応します。SQL構文にWITH SCHEMA EVOLUTIONを含めるか、Databricks APIで.withSchemaEvolution()を使用します。mergeSchemaオプションを設定する: バッチ書き込みとストリーミング書き込みの両方に対応しています。.option("mergeSchema", "true")を個別の書き込み操作に設定します。- Spark構成 (レガシ) を設定: SparkSession全体に対して
spark.databricks.delta.schema.autoMerge.enabledをtrueに設定します。本番運用での使用はお勧めしません。
Databricksでは、Spark設定を直接設定するのではなく、WITH SCHEMA EVOLUTION 構文または mergeSchema オプションを使用して、各書き込み操作でスキーマ進化を有効にすることを推奨しています。
書き込み操作でスキーマ進化を有効にするためにオプションまたは構文を使用する場合、これがSpark構成よりも優先されます。
新しい列を追加するには、書き込みのスキーマ進化の有効化
ソースクエリには存在するが、ターゲットテーブルには存在しないカラムは、スキーマ進化が有効な場合、書き込みトランザクションの一部として自動的に追加されます。 スキーマ進化の有効化を参照してください。
大文字と小文字は、新しい列を追加するときに保持されます。新しい列がテーブルスキーマの最後尾に追加されます。追加の列が構造体に含まれている場合、ターゲットテーブル内の構造体の末尾に追加されます。
INSERT のスキーマ進化構文
INSERT ステートメントで WITH SCHEMA EVOLUTION 句を使用すると、スキーマ進化を有効にできます。
INSERT WITH SCHEMA EVOLUTION INTO target_table
SELECT * FROM source_table
source_table に対するクエリが、ターゲットテーブルに存在しないカラムを返す場合、それらのカラムは自動的に target_table スキーマに追加されます。既存の行には、新しい列に対してNULLの値が付与されます。
DataFrame API を使用したスキーマ進化による INSERT
次の例は、バッチ書き込み操作でのmergeSchemaオプションの使用方法を示しています。
- Python
- Scala
(spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
)
spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
ストリーミングにおけるスキーマ進化を伴うINSERT
次の例は、Auto Loader で mergeSchema オプションの使用方法を示しています。「Auto Loader とは何ですか?」を参照してください。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
マージのための自動スキーマ進化
スキーマ進化により、マージ時にターゲットテーブルとソーステーブル間のスキーマの不一致を解決することができます。以下の2つのケースに対応する:
-
ソーステーブルに列が存在しますが、ターゲットテーブルには存在せず、挿入または更新アクションの割り当てにおいて名前で指定されています。あるいは、
UPDATE SET *またはINSERT *のアクションが存在します。その列はターゲットスキーマに追加され、その値はソースの対応する列から入力されます。
-
これは、マージソース内の列名と構造がターゲットの割り当てと完全に一致する場合にのみ適用されます。
-
新しい列はソーススキーマに存在する必要があります。アクション句で新しい列を割り当てても、その列は定義されません。
これらの例ではスキーマ進化が可能です:
SQL-- The column newcol is present in the source but not in the target. It will be added to the target.
UPDATE SET target.newcol = source.newcol
-- The field newfield doesn't exist in struct column somestruct of the target. It will be added to that struct column.
UPDATE SET target.somestruct.newfield = source.somestruct.newfield
-- The column newcol is present in the source but not in the target.
-- It will be added to the target.
UPDATE SET target.newcol = source.newcol + 1
-- Any columns and nested fields in the source that don't exist in target will be added to the target.
UPDATE SET *
INSERT *列
newcolがsourceスキーマに存在しない場合、これらの例ではスキーマ進化はトリガーされません。SQLUPDATE SET target.newcol = source.someothercol
UPDATE SET target.newcol = source.x + source.y
UPDATE SET target.newcol = source.output.newcol -
-
ターゲットテーブルには列が存在しますが、ソーステーブルには存在しません。
ターゲットスキーマは変更されません。次の列:
-
UPDATE SET *の場合、変更されないままになります。 -
INSERT *向けにNULLに設定されています。 -
アクション句で割り当てられている場合でも、依然として明示的に変更できます。
例えば:
SQLUPDATE SET * -- The target columns that are not in the source are left unchanged.
INSERT * -- The target columns that are not in the source are set to NULL.
UPDATE SET target.onlyintarget = 5 -- The target column is explicitly updated.
UPDATE SET target.onlyintarget = source.someothercol -- The target column is explicitly updated from some other source column. -
自動スキーマ進化を手動で有効にする必要があります。 スキーマ進化の有効化を参照してください。
Databricks Runtime 12.2 LTS 以降では、ソーステーブルに存在する列および構造体フィールドは、挿入または更新アクションで名前で指定できます。Databricks Runtime 11.3 LTS以前では、マージによるスキーマ進化に使用できるのはINSERT *またはUPDATE SET *アクションのみです。
Databricks Runtime 13.3 LTS以降では、map<int, struct<a: int, b: int>>のようなマップ内にネストされた構造体でのスキーマ進化を使用できます。
マージのためのスキーマ進化構文
Databricks Runtime 15.4 LTS 以降では、SQL またはテーブル APIs を使用して、マージ ステートメントでスキーマ進化を指定できます。
- SQL
- Python
- Scala
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
スキーマ進化を伴うマージ操作の例
ここでは、スキーマ進化を伴う場合と伴わない場合のmerge操作の影響の例をいくつか示します。
列 | クエリー(SQL の場合) | スキーマ進化なしの動作(既定) | スキーマの進化に伴う動作 |
|---|---|---|---|
ターゲット列: ソース列: | SQL | テーブルのスキーマは変更されません。列 | テーブルスキーマが |
ターゲット列: ソース列: | SQL |
| テーブルスキーマが |
ターゲット列: ソース列: | SQL |
| テーブルスキーマが |
ターゲット列: ソース列: | SQL |
| テーブルスキーマが |
(1)この動作はDatabricks Runtime 12.2 LTS以降で利用できます。Databricks Runtime 11.3 LTS以前では、この状況でエラーが発生します。
マージでの列の除外
Databricks Runtime 12.2 LTS 以降では、マージ条件で EXCEPT 句を使用することで、列を明示的に除外できます。EXCEPT キーワードの動作は、スキーマ進化が有効かどうかによって異なります。
スキーマ進化が無効な場合、EXCEPT キーワードがターゲットテーブルの列のリストに適用され、UPDATE または INSERT アクションから列を除外できるようになります。除外された列は null に設定されます。
スキーマ進化が有効な場合、EXCEPT キーワードがソーステーブルの列のリストに適用され、スキーマ進化から列を除外することができます。ターゲットに存在しないソースの新しい列が EXCEPT 句にリストされている場合、その列はターゲットスキーマに追加されません。除外される列がターゲットに既に存在する場合、null に設定されます。
次の例は、構文を示しています。
列 | クエリー(SQL の場合) | スキーマ進化なしの動作(既定) | スキーマの進化に伴う動作 |
|---|---|---|---|
ターゲット列: ソース列: | SQL | 一致した行は、 | 一致した行は、 |
ターゲット列: ソース列: | SQL |
| 一致した行は、 |
Spark 設定によるスキーマ進化の有効化 (レガシ)
現在のSparkSessionにおけるすべての書き込み操作に対してスキーマ進化を有効にするには、Spark構成 spark.databricks.delta.schema.autoMerge.enabled を true に設定できます。
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", true)
SET spark.databricks.delta.schema.autoMerge.enabled=true
このアプローチは本番運用での使用はお勧めしません。代わりに、書き込み操作ごとにスキーマ進化を有効化します。
INSERTとバッチ/ストリーミング書き込みには、.option("mergeSchema", "true")またはINSERT WITH SCHEMA EVOLUTIONを使用します。MERGEステートメントの場合は、MERGE WITH SCHEMA EVOLUTIONを使用します。
セッション全体の設定を行うと、複数の操作にわたって意図しないスキーマの変更が生じる可能性があり、どの操作がスキーマを進化させているのかを把握することが難しくなります。
書き込み操作でスキーマ進化を有効にするためにオプションまたは構文を使用する場合、これがSpark構成よりも優先されます。
テーブルスキーマを置き換える
デフォルトでは、テーブル内のデータを上書きしてもスキーマは上書きされません。replaceWhereを使用せずにmode("overwrite")を使用してテーブルを上書きする場合でも、書き込まれるデータのスキーマを上書きする必要がある場合があります。overwriteSchemaオプションをtrueに設定して、テーブルのスキーマとパーティションを置き換えます:
df.write.option("overwriteSchema", "true")
動的パーティションの上書きを使用する場合、overwriteSchemaをtrueとして指定することはできません。