型の拡張
プレビュー
この機能は、Databricks Runtime 15.4 LTS 以降で パブリック プレビュー 段階です。
型の拡張が有効になっているテーブルでは、基になるデータ ファイルを書き換えることなく、列のデータ型をより広い型に変更できます。 列タイプを手動で変更することも、スキーマ進化を使用して列タイプを進化させることもできます。
型の拡大は、Databricks Runtime 15.4 LTS 以降で使用できます。拡大処理が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。
型拡張には Delta Lake が必要です。 すべての Unity Catalog マネージドテーブルは、デフォルトで Delta Lake を使用します。
サポートされているタイプの変更
タイプの幅は、次のルールに従って変更できます。
元の型 | サポートされている拡張型 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
型の変更は、構造体、マップ、および配列内にネストされた最上位の列とフィールドでサポートされています。
Spark 、操作によって Integer 型がdecimalまたはdoubleに昇格され、下流の取り込みによって値が整数列に書き戻される場合、値の小数部分を default で切り捨てます。 割り当てポリシーの動作の詳細については、 「ストアの割り当て」を参照してください。
数値タイプを decimalに変更する場合、合計精度は開始精度以上である必要があります。 スケールも大きくする場合は、合計精度を対応する量だけ増やす必要があります。
byte、short、int タイプの最小ターゲットは decimal(10,0) です。 long の最小目標は decimal(20,0) です。
decimal(10,1)を持つフィールドに小数点以下 2 桁を追加する場合、最小ターゲットは decimal(12,3)です。
型の拡張を有効にする
既存のテーブルでタイプ拡大を有効にするには、delta.enableTypeWidening table プロパティを次のように設定しますtrue
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
テーブルの作成中に型の拡張を有効にすることもできます。
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
型の拡大を有効にすると、テーブル機能 typeWideningが設定され、リーダーとライターのプロトコルがアップグレードされます。Databricks Runtime 15.4 以降を使用して、型の拡大が有効になっているテーブルを操作する必要があります。外部クライアントもテーブルと対話する場合は、このテーブル機能をサポートしていることを確認します。「Delta Lake 機能の互換性とプロトコル」を参照してください。
タイプ変更の手動適用
ALTER COLUMNコマンドを使用して、タイプを手動で変更します。
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
この操作は、基になるデータ ファイルを書き換えずにテーブル スキーマを更新します。詳細については、 ALTER TABLE を参照してください。
自動スキーマ進化によるタイプの幅を広げる
スキーマの進化は、型の拡大と連携して、受信データの型と一致するようにターゲット テーブルのデータ型を更新します。
型の拡大を有効にしないと、スキーマ進化は常に、ターゲット テーブルの列型と一致するようにデータをダウンキャストしようとします。 ターゲットテーブルのデータ型を自動的に拡大しない場合は、スキーマ進化が有効になっているワークロードを実行する前に、型の拡大を無効にします。
スキーマ進化を使用してインジェスト中に列のデータ型を広げるには、次の条件を満たす必要があります。
- write コマンドは、自動スキーマ進化が有効になっている状態で実行されます。
- ターゲット テーブルでタイプの拡大が有効になっています。
- ソース カラム タイプは、ターゲット カラム タイプよりも幅が広くなっています。
- タイプワイドニングはタイプチェンジをサポートします。
これらの条件をすべて満たさない型の不一致は、通常のスキーマ強制ルールに従います。 スキーマ強制を参照してください。
次の例は、一般的な書き込み操作で型拡張がスキーマ進化でどのように機能するかを示しています。
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Enable schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled = true;
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
タイプ拡幅処理テーブル機能を無効にする
有効なテーブルで誤って型が拡大されるのを防ぐには、プロパティを次のように設定しますfalse
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
この設定により、テーブルに対する今後の型の変更は防止されますが、型拡張テーブル機能は削除されず、以前の型の変更も元に戻されません。
型拡張テーブル機能を完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
Databricks Runtime 15.4 LTS を使用して型の拡大を有効にしたテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。
型の拡張を削除すると、Databricks は現在のテーブル スキーマに準拠していないすべてのデータ ファイルを書き換えます。「Delta Lake テーブル機能の削除とテーブル プロトコルのダウングレード」を参照してください。
Delta テーブルからのストリーミング
構造化ストリーミングでの型の拡張のサポートは、Databricks Runtime 16.4 LTS 以降で利用できます。
型拡張を有効にしてDeltaテーブルからストリーミングする場合、ターゲット テーブルでmergeSchemaオプションを使用してスキーマ進化を有効にすることで、ストリーミング クエリの自動型拡張を構成できます。 ターゲット テーブルでは、型の拡張が有効になっている必要があります。「型拡張を有効にする」を参照してください。
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
mergeSchemaが有効で、ターゲット テーブルで型の拡張が有効になっている場合:
- タイプの変更は、手動による介入を必要とせずに、ダウンストリーム テーブルに自動的に適用されます。
- 新しい列は、ダウンストリーム テーブル スキーマに自動的に追加されます。
mergeSchemaを有効にしないと、値はspark.sql.storeAssignmentPolicy設定に従って処理され、目標カラム タイプに一致するように値が意図的にダウンキャストされます。 割り当てポリシーの動作の詳細については、 「ストア割り当て」を参照してください。
手動タイプ変更の確認
Delta テーブルからストリーミングする場合、型の変更を含む非加算的なスキーマの変更を追跡するためのスキーマ追跡場所を提供できます。スキーマ追跡場所の指定は、Databricks Runtime 18.0 以下では必須であり、Databricks Runtime 18.1 以上ではオプションです。
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
スキーマ追跡場所を指定した後、ストリームは型の変更を検出すると追跡対象のスキーマを展開し、停止します。その時点で、ダウンストリーム テーブルでの型の拡張を有効にしたり、ストリーミング クエリを更新したりするなど、型の変更を処理するために必要なアクションを実行できます。
処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプションを次のように設定します allowSourceColumnTypeChange。
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
チェックポイント ID <checkpoint_id> と Delta Lake ソーステーブルのバージョン <delta_source_table_version> は、ストリームが停止するとエラーメッセージに表示されます。
Lakeflow Spark宣言型パイプライン
LakeFlow Spark宣言型パイプラインでのタイプ拡張のサポートは、PREVIEW チャンネルで利用できます。
LakeFlow Spark宣言型パイプラインのタイプ拡張をパイプライン レベルまたは個々のテーブルで有効にできます。 型の拡張により、ストリーミング テーブルを完全に更新することなく、パイプラインの実行中に列の型を自動的に拡張できます。 マテリアライズドビューでの型の変更は常に完全な再計算をトリガーし、型の変更がソース テーブルに適用されると、そのテーブルに依存するマテリアライズドビューは新しい型を反映するために完全な再計算を必要とします。
パイプライン全体の型拡張を有効にする
パイプライン内のすべてのテーブルに対して型の拡張を有効にするには、パイプライン構成pipelines.enableTypeWideningを設定します。
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
特定のテーブルの型拡張を有効にする
テーブルプロパティdelta.enableTypeWideningを設定することで、個々のテーブルの型拡張を有効にすることもできます。
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
下流リーダーとの互換性
型の拡張が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。パイプラインで型の拡張が有効になっているテーブルを、Databricks Runtime 14.3 以前のリーダーで読み取り可能にするには、次のいずれかを行う必要があります。
- プロパティ
delta.enableTypeWidening/pipelines.enableTypeWideningを削除するか false に設定して型の拡張を無効にし、テーブルの完全な更新をトリガーします。 - テーブルで互換Modeを有効にします。
Delta Sharing
Delta Sharing での型の拡大のサポートは、Databricks Runtime 16.1 以降で使用できます。
タイプ拡大拡大を有効にした Delta Lake テーブルの共有は、 Databricks-to-Databricks Delta Sharingでサポートされています。 プロバイダーと受信者は、Databricks Runtime 16.1 以降を使用している必要があります。
を使用してタイプ拡大が有効になっている テーブルからチェンジデータフィードを読み取るには、応答形式をDelta LakeDelta Sharing に設定する必要があります。delta
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
タイプ変更間でのチェンジデータフィードの読み取りはサポートされていません。 代わりに、オペレーションを 2 つの別々の読み取りに分割し、1 つはタイプ変更を含むテーブルバージョンで終了し、もう 1 つはタイプ変更を含むバージョンで開始する必要があります。
制限
Apache Iceberg の互換性
Apache Iceberg、型の拡大に適用されるすべての型の変更がサポートされているわけではありません。スキーマの進化Icebergを参照してください。特に、Databricks では次の型の変更はサポートされていません。
byte、short、int、decimal``longまたはdouble- 小数点以下のスケールの増加
date-timestampNTZ
Delta Lake テーブルで Iceberg 互換性のある UniForm を有効にすると、これらの型変更のいずれかを適用するとエラーが発生します。Iceberg クライアントで Delta テーブルを読み取るを参照してください。
これらのサポートされていない型の変更のいずれかを Delta Lake テーブルに適用する場合、次の 2 つのオプションがあります。
-
Iceberg メタデータの再生成: 型拡張テーブル機能を使用せずに Iceberg メタデータを再生成するには、次のコマンドを使用します。
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')これにより、互換性のないタイプの変更を適用した後でも、均一な互換性を維持できます。
-
型拡張テーブル機能を削除します。 「型拡張テーブル機能を無効にする」を参照してください。
型依存関数
一部の SQL 関数は、入力データ型に応じて結果を返します。たとえば、 hash関数は、引数の型が異なる場合、同じ論理値に対して異なるハッシュ値を返します。 hash(1::INT) hash(1::BIGINT)とは異なる結果を返します。
その他の型依存関数は次のとおりです: xxhash64 、 bit_get 、 bit_reverse 、 typeof 。
これらの関数を使用するクエリで安定した結果を得るには、値を目的の型に明示的にキャストします。
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
その他の制限事項
- 型が変更された Delta Lake テーブルからストリーミングする場合、SQL を使用してスキーマ追跡の場所を指定することはできません。
- Delta Sharing を使用して、型の拡張が有効になっているテーブルを、Databricks 以外のコンシューマーと共有することはできません。