メインコンテンツまでスキップ

Delta Lake テーブル スキーマの更新

Delta Lakeを使用すると、テーブルのスキーマを更新できます。次の種類の変更がサポートされています:

  • 新しい列の追加(任意の位置)
  • 既存の列の並べ替え
  • 既存の列の名前を変更する

これらの変更は、DDLを使用して明示的に行うことも、DMLを使用して暗黙的に行うこともできます。

important

Deltaテーブルスキーマの更新は、すべての並列Delta書き込み操作と競合する操作です。

Delta テーブル スキーマを更新すると、そのテーブルから読み取ったストリームは終了します。 ストリームを続行する場合は、ストリームを再起動する必要があります。 推奨される方法については、 構造化ストリーミングの本番運用に関する考慮事項を参照してください。

スキーマを明示的に更新して列を追加します

SQL
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

デフォルトでは、NULL値の許容はtrueです。

ネストされたフィールドに列を追加するには、次のコマンドを使用します:

SQL
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)を実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
注記

ネストされた列の追加は、構造体に対してのみサポートされています。配列とマップはサポートされていません。

スキーマを明示的に更新して、列のコメントまたは順序を変更する

SQL
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

ネストされたフィールドの列を変更するには、次のコマンドを使用します。

SQL
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRSTを実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field2
| +-field1

スキーマを明示的に更新して列を置き換えます

SQL
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

例えば、以下のようなDDLを実行する場合:

SQL
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

スキーマを明示的に更新して列の名前を変更する

注記

この機能は、Databricks Runtime 10.4 LTS 以降で使用できます。

列の既存のデータを書き換えずに列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

列の名前を変更するには:

SQL
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

ネストされたフィールドの名前を変更するには:

SQL
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

たとえば、次のコマンドを実行するとします:

SQL
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

以前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のようになります:

- root
| - colA
| - colB
| +-field001
| +-field2

「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

スキーマを明示的に更新して列をドロップする

注記

この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。

データファイルを書き換えることなく、メタデータのみの操作として列をドロップするには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

important

メタデータから列を削除しても、ファイル内の列の基になるデータは削除されません。 ドロップされた列データをパージするには、 REORG TABLE を使用してファイルを書き換えます。 その後、vacuum を使用して、ドロップされた列データを含むファイルを物理的に削除できます。

列を削除するには:

SQL
ALTER TABLE table_name DROP COLUMN col_name

複数の列を削除するには:

SQL
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

スキーマを明示的に更新して列のタイプまたは名前を変更する

テーブルを書き換えることで、列のタイプや名前を変更したり、列を削除したりできます。これを行うには、overwriteSchemaオプションを使用します。

次の例は、列の型を変更する方法を示しています:

Python
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)

次の例は、列名の変更を示しています:

Python
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)

スキーマ進化の有効化

スキーマ進化を有効にするには、次のいずれかを実行します。

Databricks は、 Spark conf を設定するのではなく、書き込み操作ごとにスキーマ進化を有効にすることをお勧めします。

オプションまたは構文を使用して書き込み操作でスキーマ進化を有効にする場合、これは Spark conf よりも優先されます。

注記

INSERT INTOステートメントのスキーマ進化句はありません。

新しい列を追加するための書き込みのスキーマ進化を有効にする

ソースクエリには存在するが、ターゲットテーブルには存在しないカラムは、スキーマ進化が有効な場合、書き込みトランザクションの一部として自動的に追加されます。 「スキーマ進化の有効化」を参照してください。

新しい列を追加するときに大文字と小文字が保持されます。 新しい列は、テーブル スキーマの最後に追加されます。 追加の列が構造体内にある場合、それらはターゲット テーブルの構造体の末尾に追加されます。

次の例は、 Auto Loaderで mergeSchema オプションを使用する方法を示しています。 「Auto Loaderとは」を参照してください。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)

次の例は、バッチ書き込み操作で mergeSchema オプションを使用する方法を示しています。

Python
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)

Delta Lake マージの自動スキーマ進化

スキーマ進化により、ユーザーはマージ時にターゲットテーブルとソース・テーブル間のスキーマの不一致を解決することができます。以下の2つのケースに対応する:

  1. ソーステーブルの列がターゲットテーブルに存在しません。新しい列がターゲットスキーマに追加され、その値がソース値を使用して挿入または更新されます。
  2. ターゲットテーブルの列がソーステーブルに存在しません。ターゲットのスキーマは変更されません。追加のターゲット列の値は変更されないままになるか(UPDATEの場合)、NULLに設定されます(INSERTの場合)。

自動スキーマ進化を手動で有効にする必要があります。 「スキーマ進化の有効化」を参照してください。

注記

Databricks Runtime 12.2 LTS 以降では、ソース テーブルに存在する列と構造体フィールドを、挿入アクションまたは更新アクションで名前で指定できます。 Databricks Runtime 11.3 LTS 以前では、マージによるスキーマ進化に使用できるのは、INSERT * または UPDATE SET * アクションのみです。

Databricks Runtime 13.3 LTS 以降では、map<int, struct<a: int, b: int>> などのマップ内にネストされた構造体でスキーマ進化を使用できます。

マージのスキーマ進化構文

Databricks Runtime 15.2 以降では、SQL または Delta テーブル APIsを使用して、マージ ステートメントでスキーマ進化を指定できます。

SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE

スキーマ進化によるマージの操作例

ここでは、スキーマ進化を伴う場合と伴わない場合のmerge操作の影響の例をいくつか示します。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: key, value

ソース列: key, value, new_value

ターゲット列: key, old_value

ソース列: key, new_value

ターゲット列: key, old_value

ソース列: key, new_value

ターゲット列: key, old_value

ソース列: key, new_value

(1) この動作は、Databricks Runtime 12.2 LTS 以降で使用できます。Databricks Runtime 11.3 LTS 以下では、この条件でエラーが発生します。

Delta Lake マージで列を除外する

Databricks Runtime 12.2 LTS 以降では、マージ条件で EXCEPT 句を使用して列を明示的に除外できます。 EXCEPT キーワードの動作は、スキーマ進化が有効になっているかどうかによって異なります。

スキーマ進化が無効な場合、EXCEPT キーワードがターゲットテーブルの列のリストに適用され、UPDATE または INSERT アクションから列を除外できるようになります。除外された列は null に設定されます。

スキーマ進化が有効な場合、EXCEPT キーワードがソーステーブルの列のリストに適用され、スキーマ進化から列を除外することができます。ターゲットに存在しないソースの新しい列が EXCEPT 句にリストされている場合、その列はターゲットスキーマに追加されません。除外される列がターゲットに既に存在する場合、null に設定されます。

次の例は、構文を示しています。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: id, title, last_updated

ソース列: id, title, review, last_updated

ターゲット列: id, title, last_updated

ソース列: id, title, review, internal_count

スキーマ更新の NullType 列の扱い

ParquetはNullTypeをサポートしていないため、Deltaテーブルに書き込むときにNullType列はDataFrameから削除されますが、スキーマには引き続き保存されます。その列に対して別のデータ型を受け取ると、Delta Lakeはスキーマを新しいデータ型にマージします。Delta Lakeが既存の列に対してNullTypeを受け取った場合、古いスキーマは保持され、新しい列は書き込み中に削除されます。

NullType ストリーミングではサポートされていません。ストリーミングを使用する場合はスキーマを設定する必要があるため、これは非常にまれです。NullTypeは、ArrayTypeMapTypeなどの複合型にも受け入れられません。

テーブル スキーマの置換

デフォルトでは、テーブル内のデータを上書きしてもスキーマは上書きされません。replaceWhereを使用せずにmode("overwrite")を使用してテーブルを上書きする場合でも、書き込まれるデータのスキーマを上書きする必要がある場合があります。overwriteSchemaオプションをtrueに設定して、テーブルのスキーマとパーティションを置き換えます:

Python
df.write.option("overwriteSchema", "true")
important

動的パーティションの上書きを使用する場合、overwriteSchematrueとして指定することはできません。