Delta Lakeテーブルのスキーマを更新する
Delta Lakeを使用すると、テーブルのスキーマを更新できます。次の種類の変更がサポートされています:
新しい列の追加(任意の位置)
既存の列の並べ替え
既存の列の名前を変更する
これらの変更は、DDLを使用して明示的に行うことも、DMLを使用して暗黙的に行うこともできます。
重要
Deltaテーブル スキーマの更新は、すべてのDelta書き込み操作と競合する操作です。
デルタテーブルスキーマを更新すると、そのテーブルから読み取るストリームが終了します。ストリームを継続したい場合は、ストリームを再起動する必要があります。 推奨される方法については、「構造化ストリーミングの制作上の考慮点」を参照してください。
スキーマを明示的に更新して列を追加する
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
デフォルトでは、NULL値の許容はtrue
です。
ネストされたフィールドに列を追加するには、次のコマンドを使用します:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
を実行する前のスキーマが次のようになっているとします:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のとおりです:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
注
ネストされた列の追加は、構造体に対してのみサポートされています。配列とマップはサポートされていません。
スキーマを明示的に更新して列のコメントまたは順序を変更する
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
ネストされたフィールドの列を変更するには、次のコマンドを使用します。
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
を実行する前のスキーマが次のようになっているとします:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のとおりです:
- root
| - colA
| - colB
| +-field2
| +-field1
スキーマを明示的に更新して列を置き換える
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
例えば、以下のようなDDLを実行する場合:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のとおりです:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
スキーマを明示的に更新して列の名前を変更する
注
この機能は、Databricks Runtime 10.4 LTS 以降で利用できます。
列の既存のデータを書き換えずに列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。
列の名前を変更するには:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
ネストされたフィールドの名前を変更するには:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
たとえば、次のコマンドを実行するとします:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
以前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
その後のスキーマは次のようになります:
- root
| - colA
| - colB
| +-field001
| +-field2
「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。
スキーマを明示的に更新して列を削除する
注
この機能は、Databricks Runtime 11.3 LTS 以降で利用できます。
データ ファイルを書き換えずにメタデータのみの操作として列を削除するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。
重要
メタデータから列を削除しても、ファイル内の列の基礎となるデータは削除されません。削除された列データを消去するには、REORG TABLEを使ってファイルを書き換えます。その後、VACUUMを使用して、ドロップされた列データを含むファイルを物理的に削除することができます。
列を削除するには:
ALTER TABLE table_name DROP COLUMN col_name
複数の列を削除するには:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
明示的にスキーマを更新して列の型や名前を変更する
テーブルを書き換えることで、列のタイプや名前を変更したり、列を削除したりできます。これを行うには、overwriteSchema
オプションを使用します。
次の例は、列の型を変更する方法を示しています:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
次の例は、列名の変更を示しています:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
休暇の進化を有効にする
次のいずれかの方法で、訪問者進化を有効にできます。
.option("mergeSchema", "true")
を Spark DataFramewrite
またはwriteStream
操作に設定します。 新しい列を追加するには、書き込みのスキーマ進化を有効にするを参照してください。MERGE WITH SCHEMA EVOLUTION
構文を使用します。merge のスキーマ進化構文を参照してください。現在の SparkSession の Spark conf
spark.databricks.delta.schema.autoMerge.enabled
をtrue
に設定します。
Databricks Spark conf を設定するのではなく、書き込み操作ごとに ゲスト進化 を有効にすることをお勧めします。
書き込み操作でユーザー進化を有効にするためにオプションまたは構文を使用すると、これがSpark conf よりも優先されます。
注
INSERT INTO
ステートメントにはスキーマ進化句がありません。
新しい列を追加するための書き込みのスキーマ進化を有効にする
ソース クエリには存在するがターゲット テーブルには存在しない列は、訪問者進化が有効になっている場合、書き込みトランザクションの一部として自動的に追加されます。 「スキーマ進化を有効にする」を参照してください。
大文字と小文字は、新しい列を追加するときに保持されます。 新しい列がテーブル スキーマの末尾に追加されます。 追加の列が構造体内にある場合は、ターゲット表の構造体の末尾に追加されます。
次の例は、 Auto Loaderで mergeSchema
オプションを使用する方法を示しています。 「Auto Loaderとは」を参照してください。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
次の例は、バッチ書き込み操作でmergeSchema
オプションを使用する方法を示しています。
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Delta Lakeマージのための自動スキーマ進化
スキーマ進化により、ユーザーはマージ時にターゲットテーブルとソース・テーブル間のスキーマの不一致を解決することができます。以下の2つのケースに対応する:
ソーステーブルの列がターゲットテーブルに存在しません。新しい列がターゲットスキーマに追加され、その値がソース値を使用して挿入または更新されます。
ターゲットテーブルの列がソーステーブルに存在しません。ターゲットのスキーマは変更されません。追加のターゲット列の値は変更されないままになるか(
UPDATE
の場合)、NULL
に設定されます(INSERT
の場合)。
自動スキーマ進化を手動で有効にする必要があります。 「スキーマ進化を有効にする」を参照してください。
注
Databricks Runtime 12.2 LTS 以降では、挿入アクションまたは更新アクションで、ソース テーブルに存在する列と構造体フィールドを名前で指定できます。 Databricks Runtime 11.3 LTS以下では、マージによる訪問者進化に使用できるのは INSERT *
または UPDATE SET *
アクションのみです。
Databricks Runtime 13.3 LTS以降では、map<int, struct<a: int, b: int>>
など、マップ内にネストされた構造体で訪問者進化を使用できます。
マージのためのスキーマ進化構文
Databricks Runtime 15.2 以降では、 SQLまたはDeltaテーブルAPIsを使用して、マージ ステートメントで訪問者進化を指定できます。
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
スキーマ進化を伴うマージ操作の例
ここでは、スキーマ進化を伴う場合と伴わない場合のmerge
操作の影響の例をいくつか示します。
列 |
クエリー(SQL の場合) |
スキーマ進化なしの動作(既定) |
スキーマの進化に伴う動作 |
---|---|---|---|
ターゲット列: ソース列: |
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
|
テーブルのスキーマは変更されません。列 |
テーブルスキーマが |
ターゲット列: ソース列: |
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
|
|
テーブルスキーマが |
ターゲット列: ソース列: |
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
|
|
テーブル スキーマが |
ターゲット列: ソース列: |
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
|
|
テーブル スキーマが |
(1)この動作は Databricks Runtime 12.2 LTS 以上で利用できます。Databricks Runtime 11.3 LTS 以下ではこの条件ではエラーが発生します。
Delta Lake マージでの列の除外
Databricks Runtime 12.2 LTS 以降では、マージ条件でEXCEPT
句を使用して列を明示的に除外できます。 EXCEPT
キーワードの動作は、訪問者進化が有効になっているかどうかによって異なります。
スキーマ進化が無効な場合、EXCEPT
キーワードがターゲットテーブルの列のリストに適用され、UPDATE
または INSERT
アクションから列を除外できるようになります。除外された列は null
に設定されます。
スキーマ進化が有効な場合、EXCEPT
キーワードがソーステーブルの列のリストに適用され、スキーマ進化から列を除外することができます。ターゲットに存在しないソースの新しい列が EXCEPT
句にリストされている場合、その列はターゲットスキーマに追加されません。除外される列がターゲットに既に存在する場合、null
に設定されます。
次の例は、構文を示しています。
列 |
クエリー(SQL の場合) |
スキーマ進化なしの動作(既定) |
スキーマの進化に伴う動作 |
---|---|---|---|
ターゲット列: ソース列: |
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
|
一致した行は、 |
一致した行は、 |
ターゲット列: ソース列: |
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
|
|
一致した行は、 |
スキーマ更新でNullType
列を扱う
ParquetはNullType
をサポートしていないため、Deltaテーブルに書き込むときにNullType
列はDataFrameから削除されますが、スキーマには引き続き保存されます。その列に対して別のデータ型を受け取ると、Delta Lakeはスキーマを新しいデータ型にマージします。Delta Lakeが既存の列に対してNullType
を受け取った場合、古いスキーマは保持され、新しい列は書き込み中に削除されます。
NullType
ストリーミングではサポートされていません。ストリーミングを使用する場合はスキーマを設定する必要があるため、これは非常にまれです。NullType
は、ArrayType
やMapType
などの複合型にも受け入れられません。
テーブルスキーマを置き換える
デフォルトでは、テーブル内のデータを上書きしてもスキーマは上書きされません。replaceWhere
を使用せずにmode("overwrite")
を使用してテーブルを上書きする場合でも、書き込まれるデータのスキーマを上書きする必要がある場合があります。overwriteSchema
オプションをtrue
に設定して、テーブルのスキーマとパーティションを置き換えます:
df.write.option("overwriteSchema", "true")
重要
動的パーティションの上書きを使用する場合、overwriteSchema
をtrue
として指定することはできません。