メインコンテンツまでスキップ

型の拡張

型の拡張が有効になっているテーブルでは、基になるデータ ファイルを書き換えることなく、列のデータ型をより広い型に変更できます。 列タイプを手動で変更することも、スキーマ進化を使用して列タイプを進化させることもできます。

重要

タイプワイドニングはDatabricks Runtime 15.4 LTS 以降で利用可能です。Databricks Runtime 15.4 LTS 以降では、型拡張が有効なテーブルを読み取ることができます。

型拡張には Delta Lake が必要です。 すべての Unity Catalog マネージドテーブルは、デフォルトで Delta Lake を使用します。

サポートされるタイプの変更

以下のルールに従って、型を拡張できます。

ソースタイプ

サポートされている拡張型

BYTE

SHORTINTBIGINTDECIMALDOUBLE

SHORT

intBIGINTDECIMALDOUBLE

INT

BIGINTDECIMALDOUBLE

BIGINT

DECIMAL

FLOAT

DOUBLE

DECIMAL

DECIMAL の、より高い精度とスケール

DATE

TIMESTAMP_NTZ

VOID

あらゆる種類

型の変更は、最上位の列、および構造体、マップ、配列内にネストされたフィールドでサポートされています。

注記

VOID 任意の型に対して、テーブルで型の拡張が有効になっている必要はありません。列VOIDの型を更新する操作は、追加設定なしで成功します。VOID 型の拡大は、Databricks Runtime 18.2以降で使用できます。

重要

Spark は、ある操作によって整数型が decimal または double に昇格され、ダウンストリームの取り込みによって値が整数カラムに書き戻される場合に、デフォルトで値の小数部を切り捨てます。割り当てポリシーの動作の詳細については、ストアの割り当てをご覧ください。

注記

任意の数値型を decimal に変更するとき、合計精度は開始精度以上である必要があります。スケールも増大させる場合、全体の精度は対応する分だけ増加させる必要があります。

byteshortint のタイプに対する最小ターゲットは decimal(10,0) です。long の最小ターゲットは decimal(20,0) です。

decimal(10,1)のフィールドに小数点以下2桁を追加したい場合は、最小ターゲットはdecimal(12,3)です。

型の拡張を有効にする

既存のテーブルに対して、テーブルプロパティdelta.enableTypeWideningtrueに設定することにより、型の拡張を有効にすることができます:

SQL
  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

テーブルの作成中に型の拡張を有効にすることもできます。

SQL
  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
重要

型の拡張を有効にすると、テーブル機能 typeWideningが設定され、リーダーとライターのプロトコルがアップグレードされます。型の拡大が有効になっているテーブルと対話するには、Databricks Runtime 15.4 以降を使用する必要があります。外部クライアントもテーブルと対話する場合は、このテーブル機能をサポートしていることを確認します。Delta Lake 機能の互換性とプロトコルを参照してください。

型の変更を手動で適用する

ALTER COLUMN コマンドを使用して、手動でタイプを変更してください。

SQL
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

この操作は、基になるデータファイルを書き換えることなく、テーブルスキーマを更新します。詳細については、ALTER TABLEを参照してください。

自動スキーマ進化での型の拡大

スキーマ進化は、型の拡張と連携して、ターゲットテーブルのデータ型を受信データの型と一致するように更新します。

注記

型の拡張を有効にしないと、スキーマ進化は常に、ターゲットテーブルの列型と一致するようにデータをダウンキャストしようとします。ターゲットテーブルのデータ型を自動的に拡大しない場合は、スキーマ進化が有効になっているワークロードを実行する前に、型の拡大を無効にします。

インジェスト中に列のデータ型を広げるためにスキーマ進化を使用するには、次の条件を満たす必要があります:

  • 書き込みコマンドは、自動スキーマ進化が有効になっている状態で実行されます。
  • ターゲットテーブルでは、型の拡張が有効になっています。
  • ソース列のタイプはターゲットカラムのタイプよりも幅が広いです。
  • 型拡張は型の変更に対応しています。

これらの条件をすべて満たさない型が一致しない場合、通常のスキーマ強制ルールに準拠します。See スキーマ強制.

次の例は、一般的な書き込み操作におけるスキーマ進化での型の拡大がどのように機能するかを示しています。

Python
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable

source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")

(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

Auto Loader

備考

プレビュー

Auto Loader の型ワイドニングのサポートは、パブリックプレビューで利用可能です。

Auto Loader は、自動スキーマ進化による型の拡張をサポートしています。型の拡張とスキーマ進化が有効になっているDelta LakeテーブルにAuto Loaderを使用してデータを取り込む場合、列の型は着信するデータに合わせて自動的に拡張されます。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)

Auto Loaderによる自動型拡張」を参照してください。また、ターゲットテーブルは型の拡張が有効になっている必要があります。型の拡張を有効にするを参照してください。

型の拡張テーブル機能を無効にする

プロパティを false に設定することで、有効なテーブルでの偶発的な型拡張を防ぐことができます。

SQL
  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

この設定により、テーブルへの将来の型変更は防止されますが、型拡張テーブル機能が削除されたり、以前の型変更が元に戻されたりすることはありません。

型拡張テーブル機能を完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。

SQL
 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
注記

Databricks Runtime 15.4 LTS を使用して型の拡大が有効になっているテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。

型の拡張を削除すると、Databricks は現在のテーブル スキーマに準拠しないすべてのデータ ファイルを書き換えます。Delta Lake テーブル機能の削除およびテーブルプロトコルのダウングレードを参照してください。

Delta Lake テーブルからのストリーミング

注記

構造化ストリーミングでの型拡張のサポートは、Databricks Runtime 16.4 LTS 以降で利用可能です。

型の拡張が有効なDelta Lakeテーブルからストリーミングする際は、ターゲットテーブルでmergeSchemaオプションを使用してスキーマ進化を有効にすることで、ストリーミングクエリの自動型拡張を構成できます。ターゲットテーブルは、型の拡張が有効になっている必要があります。型拡張を有効にするを参照してください。

Python
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)

mergeSchema が有効になっており、ターゲットテーブルで型の拡大が有効になっている場合:

  • 型変更は、ダウンストリームテーブルに手動介入なしで自動的に適用されます。
  • 新しい列が自動的にダウンストリームのテーブルスキーマに追加されます。

mergeSchemaが有効になっていない場合、値はspark.sql.storeAssignmentPolicyの設定に従って処理されます。デフォルトでは、値はターゲットカラム型に一致するようにダウンキャストされます。割り当てポリシーの動作に関する詳細情報については、ストア割り当てを参照してください。

手動の型の変更の確認

Delta Lakeテーブルからストリーミングする際に、型の変更を含む非追加スキーマ変更を追跡するために、スキーマ追跡場所を指定できます。スキーマの追跡場所の指定は、Databricks Runtime 18.0 以下で必須です。 Databricks Runtime 18.1 以降ではオプションです。

Python
checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

スキーマ追跡ロケーションを指定した後、型変更が検出されると、ストリームは追跡対象のスキーマを進化させ、その後停止します。その際、型の変更に対応するために必要な対応を行うことができます。例えば、ダウンストリームテーブルで型の拡張を有効にする、あるいはストリーミングクエリを更新するなどの対応です。

処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプション allowSourceColumnTypeChange を設定します。

Python
checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

ストリームが停止すると、エラーメッセージにチェックポイントID <checkpoint_id> と Delta Lake ソーステーブルバージョン <delta_source_table_version> が表示されます。

ストリーミング Delta Lake のオプションの完全なリストについては、「Delta Lake」を参照してください。

Lakeflow Spark宣言型パイプライン

Lakeflow Spark宣言型パイプラインで、パイプラインレベルまたは個別のテーブルに対して、型拡大を有効にできます。型の拡張により、ストリーミングテーブルの完全な更新を必要とせずに、パイプラインの実行中に列のデータ型を自動的に広げることができます。マテリアライズドビューでの型変更は常に完全な再計算を引き起こし、ソーステーブルに型変更が適用されると、そのテーブルに依存するマテリアライズドビューは新しい型を反映するために完全な再計算が必要になります。

パイプライン全体で型の拡張を有効にする

パイプライン内のすべてのテーブルの型拡大を有効にするには、パイプライン構成pipelines.enableTypeWideningを設定します。

JSON
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}

特定のテーブルで型の拡張を有効にする

個々のテーブルで型の拡張を有効にするには、テーブル プロパティ delta.enableTypeWidening を設定します。

Python
import dlt

@dlt.table(
table_properties={&quot;delta.enableTypeWidening&quot;: &quot;true&quot;}
)
def my_table():
return spark.readStream.table("source_table")

下流リーダーとの互換性

Databricks Runtime 15.4 LTS 以降では、型拡張が有効なテーブルを読み取ることができます。パイプラインで型拡張が有効になっているテーブルを、Databricks Runtime 14.3以下を使用しているリーダーから読み取り可能にしたい場合は、以下のいずれかの対応が必要です:

  • 型の拡張を無効にするには、delta.enableTypeWidening/pipelines.enableTypeWidening プロパティを削除するか、false に設定することで、テーブルの完全な更新をトリガーします。
  • テーブルでCompatibility Modeを有効にします。

OpenSharing

注記

OpenSharingにおける型拡張のサポートは、Databricks Runtime 16.1 以降で利用可能です。

Databricks-to-Databricks OpenSharing では、型拡張が有効な Delta Lake テーブルの共有がサポートされています。プロバイダーと受信者はDatabricks Runtime 16.1 以降である必要があります。

OpenSharing を使用して型拡張が有効になっている Delta Lake テーブルからチェンジデータフィードを読み取るには、応答形式を delta に設定する必要があります。

Scala
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")

型の変更を伴うチェンジデータフィードの読み取りはサポートされていません。代わりに、操作を2つの別々の読み取りに分割する必要があります。1つはタイプ変更を含むテーブルバージョンで終了し、もう1つはタイプ変更を含むバージョンで開始します。

制限事項:

Apache Iceberg 互換性

Apache Iceberg は、型拡大によってカバーされるすべての型変更をサポートしていません。Iceberg スキーマ進化を参照してください。特に、Databricks は次の種類の変更をサポートしていません。

  • byteshortintlong から decimal または double
  • 小数点以下の増加
  • date - timestampNTZ

Delta Lake テーブルで Iceberg 互換性を備えた UniForm を有効にすると、これらのいずれかのタイプの変更を適用するとエラーになります。Icebergクライアントを使用したDelta Lakeテーブルの読み取りを参照してください。

これらのサポートされていない型の変更のいずれかをDelta Lakeテーブルに適用した場合、2つの選択肢があります:

  1. Iceberg メタデータの再生成:型拡張テーブル機能なしで Iceberg メタデータを再生成するには、次のコマンドを使用します。

    SQL
    ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')

    これにより、互換性のない型変更を適用した後でも、互換性を維持できます。

  2. 型拡張テーブル機能を削除する:型の拡張テーブル機能を無効にするを参照してください。

型依存関数

一部の SQL 関数は、入力データ型に依存する結果を返します。例えば、hash 関数は、引数の型が異なる場合、同じ論理値に対して異なるハッシュ値を返します。hash(1::INT)hash(1::BIGINT) とは異なる結果を返します。

その他の型依存関数は次のとおりです: xxhash64bit_getbit_reversetypeof

これらの関数を使用するクエリで安定した結果を得るには、値を目的の型に明示的にキャストしてください。

Python
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")

その他の制限事項

  • 型変更があるDelta Lakeテーブルからストリーミングする際に、SQLを使用してスキーマ追跡の場所を指定することはできません。
  • OpenSharing を使用して、型拡張が有効になっているテーブルを Databricks 以外のコンシューマーと共有することはできません。