メインコンテンツまでスキップ

型の拡張

Databricks Runtime 15.4 LTS 以降の Delta Lake テーブルで利用可能な型の拡大により、データファイルを書き換えることなく、列のデータ型をより広い型に変更できます。

すべてのUnity Catalogマネージドテーブルは、デフォルトでDelta Lakeを使用します。Delta LakeおよびApache IcebergのUnity Catalog マネージドテーブルを参照してください。

注記

型の拡張を有効にすると、リーダーとライターのプロトコルがアップグレードされます。これは、外部のDelta Lakeクライアントとの互換性に影響を及ぼす可能性があります。Delta Lake 機能の互換性とプロトコルを参照してください。

型拡張が有効なテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。

サポートされるタイプの変更

以下のルールに従って、型を拡張できます。

ソースタイプ

サポートされている拡張型

BYTE

SHORTINTBIGINTDECIMALDOUBLE

SHORT

INTBIGINTDECIMALDOUBLE

INT

BIGINTDECIMALDOUBLE

BIGINT

DECIMAL

FLOAT

DOUBLE

DECIMAL

DECIMAL の、より高い精度とスケール

DATE

TIMESTAMP_NTZ

VOID

あらゆる種類

型の変更は、最上位の列、および構造体、マップ、配列内にネストされたフィールドでサポートされています。

注記

VOID 任意の型に対して、テーブルで型の拡張が有効になっている必要はありません。列VOIDの型を更新する操作は、追加設定なしで成功します。VOID 型の拡大は、Databricks Runtime 18.2以降で使用できます。

Decimal の動作

Spark は、ある操作によって整数型が decimal または double に昇格され、ダウンストリームの取り込みによって値が整数カラムに書き戻される場合に、デフォルトで値の小数部を切り捨てます。割り当てポリシーの動作の詳細については、ストアの割り当てをご覧ください。

任意の数値型を decimal に変更するとき、合計精度は開始精度以上である必要があります。スケールも増大させる場合、全体の精度は対応する分だけ増加させる必要があります。

byteshortint のタイプに対する最小ターゲットは decimal(10,0) です。long の最小ターゲットは decimal(20,0) です。

decimal(10,1)のフィールドに小数点以下2桁を追加したい場合は、最小ターゲットはdecimal(12,3)です。

型の拡張を有効にする

注記

型の拡張を有効にすると、リーダーとライターのプロトコルがアップグレードされます。これは、外部のDelta Lakeクライアントとの互換性に影響を及ぼす可能性があります。Delta Lake 機能の互換性とプロトコルを参照してください。

既存のテーブルに対して、テーブルプロパティdelta.enableTypeWideningtrueに設定することにより、型の拡張を有効にすることができます:

SQL
  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

テーブルの作成中に型の拡張を有効にすることもできます。

SQL
  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

型の変更を手動で適用する

ALTER COLUMN コマンドを使用して、手動でタイプを変更してください。

SQL
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

この操作は、基になるデータファイルを書き換えることなく、テーブルスキーマを更新します。詳細については、ALTER TABLEを参照してください。

自動スキーマ進化での型の拡大

型の拡大を伴うスキーマ進化を使用して、ターゲットテーブルのデータ型を着信データの型と一致するように更新します。

注記

型の拡張を有効にしないと、スキーマ進化は常に、ターゲットテーブルの列型と一致するようにデータをダウンキャストしようとします。ターゲットテーブルのデータ型を自動的に拡大したくない場合は、スキーマ進化が有効になっているワークロードを実行する前に、型の拡大を無効にする必要があります。

インジェスト中に列のデータ型を広げるためにスキーマ進化を使用するには、次の条件を満たす必要があります:

  • 書き込みコマンドは、自動スキーマ進化が有効になっている状態で実行されます。
  • ターゲットテーブルでは、型の拡張が有効になっています。
  • ソース列のタイプはターゲットカラムのタイプよりも幅が広いです。
  • 型拡張は型の変更に対応しています。

これらの条件をすべて満たさない型が一致しない場合、通常のスキーマ強制ルールに準拠します。See スキーマ強制.

次の例は、型の拡大がスキーマ進化とどのように連携するかを示しています。

INT列を持つターゲットテーブルとBIGINT列を持つソーステーブルを作成します:

Python
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

スキーマ進化でsaveAsTable()を使用し、追加時にINT列を自動的にBIGINTにワイド化する:

Python
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

スキーマ進化でMERGE INTOを使用します。

Python
from delta.tables import DeltaTable

source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")

(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

Auto Loader

備考

プレビュー

Auto Loader の型ワイドニングのサポートは、パブリックプレビューで利用可能です。

Auto Loader は、自動スキーマ進化による型の拡張をサポートしています。型の拡張とスキーマ進化が有効になっているDelta LakeテーブルにAuto Loaderを使用してデータを取り込む場合、列の型は着信するデータに合わせて自動的に拡張されます。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)

Auto Loaderによる自動型拡張」を参照してください。また、ターゲットテーブルは型の拡張が有効になっている必要があります。型の拡張を有効にするを参照してください。

型の拡張テーブル機能を無効にする

プロパティを false に設定することで、有効なテーブルでの偶発的な型拡張を防ぐことができます。

SQL
  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

この設定により、テーブルへの将来の型変更は防止されますが、型拡張テーブル機能が削除されたり、以前の型変更が元に戻されたりすることはありません。

型拡張テーブル機能を完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。

SQL
 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
注記

Databricks Runtime 15.4 LTSを使用して型拡張を有効にしたテーブルでは、代わりに機能typeWidening-previewを削除する必要があります。

型の拡張を削除すると、Databricks は現在のテーブル スキーマに準拠しないすべてのデータ ファイルを書き換えます。Delta Lake テーブル機能の削除およびテーブルプロトコルのダウングレードを参照してください。

Delta Lake テーブルからのストリーミング

構造化ストリーミングでの型拡張のサポートは、Databricks Runtime 16.4 LTS 以降で利用可能です。

型の拡張が有効なDelta Lakeテーブルからストリーミングする際は、ターゲットテーブルでmergeSchemaオプションを使用してスキーマ進化を有効にすることで、ストリーミングクエリの自動型拡張を構成できます。ターゲットテーブルは、型の拡張が有効になっている必要があります。型拡張を有効にするを参照してください。

Python
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)

mergeSchema が有効になっており、ターゲットテーブルで型の拡大が有効になっている場合:

  • 型変更は、ダウンストリームテーブルに手動介入なしで自動的に適用されます。
  • 新しい列が自動的にダウンストリームのテーブルスキーマに追加されます。

mergeSchemaが有効になっていない場合、値はspark.sql.storeAssignmentPolicyの設定に従って処理されます。デフォルトでは、値はターゲットカラム型に一致するようにダウンキャストされます。割り当てポリシーの動作に関する詳細情報については、ストア割り当てを参照してください。

ストリームにおける型の変更を処理する

Delta Lakeテーブルからストリーミングする際に、型の変更を含む非追加スキーマ変更を追跡するために、スキーマ追跡場所を指定できます。スキーマの追跡場所の指定は、Databricks Runtime 18.0 以下で必須です。 Databricks Runtime 18.1 以降ではオプションです。

SQL を使用して schemaTrackingLocation を設定することはできません。サポートされていない機能を参照してください。

schemaTrackingLocation ストリーミングチェックポイントと同じパス内の場所に設定する必要があります。例えば:

Python
checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

スキーマトラッキングの場所を設定すると、ストリームは型変更を検出したときに追跡されたスキーマを進化させ、停止します。その際、ダウンストリームテーブルでの型拡張の有効化やストリーミングクエリの更新など、型変更を処理する必要があります。

処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダーオプション allowSourceColumnTypeChange を、次の例のように設定してください。

Python
checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

ストリームが停止すると、エラーメッセージが表示され、チェックポイント ID <checkpoint_id> と Delta Lake ソーステーブルバージョン <delta_source_table_version> が表示されます。

ストリーミング Delta Lake のオプションの完全なリストについては、「Delta Lake」を参照してください。

Lakeflow Spark宣言型パイプライン

Lakeflow Spark宣言型パイプラインで、パイプラインレベルまたは個別のテーブルに対して、型拡大を有効にできます。型の拡張により、ストリーミングテーブルの完全な更新を必要とせずに、パイプラインの実行中に列のデータ型を自動的に広げることができます。マテリアライズドビューでの型変更は常に完全な再計算を引き起こし、ソーステーブルに型変更が適用されると、そのテーブルに依存するマテリアライズドビューは新しい型を反映するために完全な再計算が必要になります。

パイプライン全体で型の拡張を有効にする

パイプライン内のすべてのテーブルの型拡大を有効にするには、パイプライン構成pipelines.enableTypeWideningを設定します。

JSON
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}

特定のテーブルで型の拡張を有効にする

個々のテーブルで型の拡張を有効にするには、テーブル プロパティ delta.enableTypeWidening を設定します。

Python
import dlt

@dlt.table(
table_properties={&quot;delta.enableTypeWidening&quot;: &quot;true&quot;}
)
def my_table():
return spark.readStream.table("source_table")

下流リーダーとの互換性

Databricks Runtime 15.4 LTS 以降では、型拡張が有効なテーブルを読み取ることができます。パイプラインで型拡張が有効になっているテーブルを、Databricks Runtime 14.3以下を使用しているリーダーから読み取り可能にしたい場合は、以下のいずれかの対応が必要です:

  • プロパティ delta.enableTypeWidening/pipelines.enableTypeWidening を削除するか、false に設定して型の拡張機能をオフにし、テーブルの完全更新をトリガーします。
  • テーブルでCompatibility Modeを有効にします。

OpenSharing

注記

OpenSharingにおける型拡張のサポートは、Databricks Runtime 16.1 以降で利用可能です。

Databricks-to-Databricks OpenSharing では、型拡張が有効な Delta Lake テーブルの共有がサポートされています。プロバイダーと受信者はDatabricks Runtime 16.1 以降である必要があります。

型拡張が有効になっている OpenSharing を使用して Delta Lake テーブルからチェンジデータフィードを読み取るには、レスポンス形式を delta に設定する必要があります:

Scala
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")

型の変更をまたいだチェンジデータフィードの読み取りはサポートされていません。代わりに、操作を2つの別々の読み取りに分割する必要があります。1つはタイプ変更を含むテーブルバージョンで終了し、もう1つはタイプ変更を含むバージョンで開始します。

制限事項:

Apache Iceberg 互換性

Apache Icebergは、型拡張でカバーされるすべての型変更をサポートしていません。Icebergスキーマ進化を参照してください。

サポートされていない型変更は次のとおりです。

  • byteshortintlong から decimal または double
  • 小数点以下の増加
  • date - timestampNTZ

Delta LakeテーブルでIceberg互換性を備えたUniFormを有効にすると、上記のいずれかのタイプ変更を適用した際にエラーが発生します。「UniForm を使用して Iceberg クライアントで Delta Lake テーブルを読み取る」を参照してください。

これらのサポートされていない型の変更のいずれかをDelta Lakeテーブルに適用した場合、2つの選択肢があります:

  • Iceberg メタデータの再生成:型拡張テーブル機能なしで Iceberg メタデータを再生成するには、次のコマンドを使用します。

    SQL
    ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')

    これにより、互換性のない型変更を適用した後でも、互換性を維持できます。

  • 型拡張テーブル機能を削除する:型の拡張テーブル機能を無効にするを参照してください。

型依存関数

一部の SQL 関数は、入力データ型に依存する結果を返します。例えば、hash 関数は、引数の型が異なる場合、同じ論理値に対して異なるハッシュ値を返します。hash(1::INT)hash(1::BIGINT) とは異なる結果を返します。

その他の型依存関数は次のとおりです: xxhash64bit_getbit_reversetypeof

これらの関数を使用するクエリで安定した結果を得るには、値を目的の型に明示的にキャストする必要があります。

Python
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")

サポートされていない機能

  • 型変更を伴うDelta Lakeテーブルからストリーミングする際に、SQL を使用してスキーマ追跡の場所を設定することはできません。
  • OpenSharing を使用して、型拡張が有効になっているテーブルを Databricks 以外のコンシューマーと共有することはできません。