メインコンテンツまでスキップ

型の拡張

備考

プレビュー

この機能は、Databricks Runtime 15.4 LTS 以降で パブリック プレビュー 段階です。

型の拡張が有効になっているテーブルでは、基になるデータ ファイルを書き換えることなく、列のデータ型をより広い型に変更できます。 列タイプを手動で変更することも、スキーマ進化を使用して列タイプを進化させることもできます。

important

型の拡大は、Databricks Runtime 15.4 LTS 以降で使用できます。拡大処理が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。

型拡張には Delta Lake が必要です。 すべての Unity Catalog マネージドテーブルは、デフォルトで Delta Lake を使用します。

サポートされているタイプの変更

タイプの幅は、次のルールに従って変更できます。

元の型

サポートされている拡張型

byte

shortintlongdecimaldouble

short

intlongdecimaldouble

int

longdecimaldouble

long

decimal

float

double

decimal

decimal の、より高い精度とスケール

date

timestampNTZ

整数値を誤って 10 進数に昇格させないようにするには、byteshortint、または long から decimal または doubleへの型の変更 を手動でコミット する必要があります。Integer型を decimal または doubleに昇格させる場合、ダウンストリームのインジェストでこの値を整数列に書き戻すと、 Spark は値の小数部をデフォルトによって切り捨てます。

注記

数値タイプを decimalに変更する場合、合計精度は開始精度以上である必要があります。 スケールも大きくする場合は、合計精度を対応する量だけ増やす必要があります。

byteshortint タイプの最小ターゲットは decimal(10,0) です。 long の最小目標は decimal(20,0) です。

decimal(10,1)を持つフィールドに小数点以下 2 桁を追加する場合、最小ターゲットは decimal(12,3)です。

型の変更は、構造体、マップ、および配列内にネストされた最上位の列とフィールドでサポートされています。

型の拡張を有効にする

既存のテーブルでタイプ拡大を有効にするには、delta.enableTypeWidening table プロパティを次のように設定しますtrue

SQL
  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

テーブルの作成中に型の拡張を有効にすることもできます。

SQL
  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
important

型の拡大を有効にすると、テーブル機能 typeWideningが設定され、リーダーとライターのプロトコルがアップグレードされます。Databricks Runtime 15.4 以降を使用して、型の拡大が有効になっているテーブルを操作する必要があります。外部クライアントもテーブルと対話する場合は、このテーブル機能をサポートしていることを確認します。「Delta Lake 機能の互換性とプロトコル」を参照してください。

タイプ変更の手動適用

ALTER COLUMNコマンドを使用して、タイプを手動で変更します。

SQL
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

この操作は、基になるデータファイルを書き換えずにテーブルスキーマを更新します。

自動スキーマ進化によるタイプの幅を広げる

スキーマの進化は、型の拡大と連携して、受信データの型と一致するようにターゲット テーブルのデータ型を更新します。

注記

型の拡大を有効にしないと、スキーマ進化は常に、ターゲット テーブルの列型と一致するようにデータをダウンキャストしようとします。 ターゲットテーブルのデータ型を自動的に拡大しない場合は、スキーマ進化が有効になっているワークロードを実行する前に、型の拡大を無効にします。

スキーマ進化を使用してインジェスト中に列のデータ型を広げるには、次の条件を満たす必要があります。

  • write コマンドは、自動スキーマ進化が有効になっている状態で実行されます。
  • ターゲット テーブルでタイプの拡大が有効になっています。
  • ソース カラム タイプは、ターゲット カラム タイプよりも幅が広くなっています。
  • タイプワイドニングはタイプチェンジをサポートします。
  • タイプ変更が byteshortint、または long から decimal または doubleのいずれでもない。これらの型の変更は、整数から小数への偶発的な昇格を避けるために、ALTER TABLE を使用して手動でのみ適用できます。

これらの条件をすべて満たさない型の不一致は、通常のスキーマ強制ルールに従います。 スキーマ強制を参照してください。

タイプ拡幅処理テーブル機能を無効にする

有効なテーブルで誤って型が拡大されるのを防ぐには、プロパティを次のように設定しますfalse

SQL
  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

この設定により、テーブルに対する今後のタイプの変更は防止されますが、タイプ拡張テーブル機能や変更されたタイプの取り消しは削除されません。

型拡張テーブル機能を完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。

SQL
 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
注記

Databricks Runtime 15.4 LTS を使用して型の拡大を有効にしたテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。

型の拡大を削除すると、現在のテーブル スキーマに準拠していないすべてのデータ ファイルが書き換えられます。「Delta Lake テーブル機能の削除」および「テーブル プロトコルのダウングレード」を参照してください。

Delta テーブルからのストリーミング

注記

構造化ストリーミングでの型の拡大のサポートは、Databricks Runtime 16.3 以降で使用できます。

Delta テーブルからストリーミングする場合、型の変更は、 列マッピングを使用して列の名前を変更したり削除したりするのと同様に、非加法スキーマの変更として扱われます。

スキーマ追跡の場所を指定して、型の変更が適用された Delta Lake テーブルからのストリーミングを有効にすることができます。

データソースに対して読み取られる各ストリーミングには、独自の schemaTrackingLocation を指定する必要があります。 指定した schemaTrackingLocation は、ストリーミング書き込みのターゲットテーブルの checkpointLocation に指定されたディレクトリに含まれている必要があります。

注記

複数のソース Delta テーブルのデータを組み合わせるストリーミング ワークロードの場合は、ソース テーブルの checkpointLocation に一意のディレクトリを指定する必要があります。

オプション schemaTrackingLocation は、次のコード例に示すように、スキーマ追跡のパスを指定するために使用されます。

Python
checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

スキーマ追跡場所を指定した後、ストリームは、型の変更が検出されるたびに追跡対象のスキーマを進化させ、停止します。その時点で、ダウンストリームテーブルで型の拡大を有効にしたり、ストリーミングクエリを更新したりするなど、型の変更を処理するために必要なアクションを実行できます。

処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプションを次のように設定します allowSourceColumnTypeChange

Python
checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

チェックポイント ID <checkpoint_id> と Delta Lake ソーステーブルのバージョン <delta_source_table_version> は、ストリームが停止するとエラーメッセージに表示されます。

Delta Sharing

注記

Delta Sharing での型の拡大のサポートは、Databricks Runtime 16.1 以降で使用できます。

タイプ拡大拡大を有効にした Delta Lake テーブルの共有は、 Databricks-to-Databricks Delta Sharingでサポートされています。 プロバイダーと受信者は、Databricks Runtime 16.1 以降を使用している必要があります。

を使用してタイプ拡大が有効になっている テーブルからチェンジデータフィードを読み取るには、応答形式をDelta LakeDelta Sharing に設定する必要があります。delta

Scala
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")

タイプ変更間でのチェンジデータフィードの読み取りはサポートされていません。 代わりに、オペレーションを 2 つの別々の読み取りに分割し、1 つはタイプ変更を含むテーブルバージョンで終了し、もう 1 つはタイプ変更を含むバージョンで開始する必要があります。

制限

Iceberg の互換性

Iceberg 、型の拡大の対象となるすべての型の変更をサポートしているわけではありません Iceberg スキーマの進化を参照してください。 特に、Databricks では次の型の変更はサポートされていません。

  • byteshortintdecimal``longまたは double
  • 小数点以下のスケールの増加
  • date - timestampNTZ

Delta Lake テーブルで UniForm with Iceberg 互換性 が有効になっている場合、これらの型の変更のいずれかを適用するとエラーが発生します。

これらのサポートされていない型の変更のいずれかを Delta Lake テーブルに適用すると、テーブルで Uniform with Iceberg 互換性 を有効にするとエラーが発生します。このエラーを解決するには、 タイプ ワイド テーブル フィーチャを削除する必要があります。

その他の制限事項

  • タイプが変更された Delta Lake テーブルからのストリーミング時に、SQL を使用したスキーマ追跡場所の提供はサポートされていません。
  • Delta Sharing を使用して、Databricks 以外のコンシューマーで型の拡大が有効になっているテーブルを共有することはサポートされていません。