型の拡張
型の拡張が有効になっているテーブルでは、基になるデータ ファイルを書き換えることなく、列のデータ型をより広い型に変更できます。 列タイプを手動で変更することも、スキーマ進化を使用して列タイプを進化させることもできます。
型の拡大は、Databricks Runtime 15.4 LTS 以降で使用できます。拡大処理が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。
型拡張には Delta Lake が必要です。 すべての Unity Catalog マネージドテーブルは、デフォルトで Delta Lake を使用します。
サポートされているタイプの変更
タイプの幅は、次のルールに従って変更できます。
元の型 | サポートされている拡張型 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
型の変更は、構造体、マップ、および配列内にネストされた最上位の列とフィールドでサポートされています。
Spark 、操作によって Integer 型がdecimalまたはdoubleに昇格され、下流の取り込みによって値が整数列に書き戻される場合、値の小数部分を default で切り捨てます。 割り当てポリシーの動作の詳細については、 「ストアの割り当て」を参照してください。
数値タイプを decimalに変更する場合、合計精度は開始精度以上である必要があります。 スケールも大きくする場合は、合計精度を対応する量だけ増やす必要があります。
byte、short、int タイプの最小ターゲットは decimal(10,0) です。 long の最小目標は decimal(20,0) です。
decimal(10,1)を持つフィールドに小数点以下 2 桁を追加する場合、最小ターゲットは decimal(12,3)です。
型の拡張を有効にする
既存のテーブルでタイプ拡大を有効にするには、delta.enableTypeWidening table プロパティを次のように設定しますtrue
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
テーブルの作成中に型の拡張を有効にすることもできます。
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
型の拡大を有効にすると、テーブル機能 typeWideningが設定され、リーダーとライターのプロトコルがアップグレードされます。Databricks Runtime 15.4 以降を使用して、型の拡大が有効になっているテーブルを操作する必要があります。外部クライアントもテーブルと対話する場合は、このテーブル機能をサポートしていることを確認します。「Delta Lake 機能の互換性とプロトコル」を参照してください。
タイプ変更の手動適用
ALTER COLUMNコマンドを使用して、タイプを手動で変更します。
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
この操作は、基になるデータ ファイルを書き換えずにテーブル スキーマを更新します。詳細については、 ALTER TABLE を参照してください。
自動スキーマ進化によるタイプの幅を広げる
スキーマの進化は、型の拡大と連携して、受信データの型と一致するようにターゲット テーブルのデータ型を更新します。
型の拡大を有効にしないと、スキーマ進化は常に、ターゲット テーブルの列型と一致するようにデータをダウンキャストしようとします。 ターゲットテーブルのデータ型を自動的に拡大しない場合は、スキーマ進化が有効になっているワークロードを実行する前に、型の拡大を無効にします。
スキーマ進化を使用してインジェスト中に列のデータ型を広げるには、次の条件を満たす必要があります。
- write コマンドは、自動スキーマ進化が有効になっている状態で実行されます。
- ターゲット テーブルでタイプの拡大が有効になっています。
- ソース カラム タイプは、ターゲット カラム タイプよりも幅が広くなっています。
- タイプワイドニングはタイプチェンジをサポートします。
これらの条件をすべて満たさない型の不一致は、通常のスキーマ強制ルールに従います。 スキーマ強制を参照してください。
次の例は、一般的な書き込み操作で型拡張がスキーマ進化でどのように機能するかを示しています。
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Auto Loader
プレビュー
Auto Loader の型拡張サポートはパブリックプレビュー版です。
Auto Loaderは、自動的なスキーマ進化による型拡張をサポートしています。Auto Loader使用して、型拡張と 進化が有効になっているDeltaテーブルにデータを取り込む場合、列の型は受信データに合わせて自動的に拡張されます。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Auto Loaderによる自動型幅調整を参照してください。 また、対象テーブルでは型拡張が有効になっている必要があります。型の拡張を有効にするを参照してください。
タイプ拡幅処理テーブル機能を無効にする
有効なテーブルで誤って型が拡大されるのを防ぐには、プロパティを次のように設定しますfalse
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
この設定により、テーブルに対する今後の型の変更は防止されますが、型拡張テーブル機能は削除されず、以前の型の変更も元に戻されません。
型拡張テーブル機能を完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
Databricks Runtime 15.4 LTS を使用して型の拡大を有効にしたテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。
型の拡張を削除すると、Databricks は現在のテーブル スキーマに準拠していないすべてのデータ ファイルを書き換えます。「Delta Lake テーブル機能の削除とテーブル プロトコルのダウングレード」を参照してください。
Delta テーブルからのストリーミング
構造化ストリーミングでの型の拡張のサポートは、Databricks Runtime 16.4 LTS 以降で利用できます。
型拡張を有効にしてDeltaテーブルからストリーミングする場合、ターゲット テーブルでmergeSchemaオプションを使用してスキーマ進化を有効にすることで、ストリーミング クエリの自動型拡張を構成できます。 ターゲット テーブルでは、型の拡張が有効になっている必要があります。「型拡張を有効にする」を参照してください。
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
mergeSchemaが有効で、ターゲット テーブルで型の拡張が有効になっている場合:
- タイプの変更は、手動による介入を必要とせずに、ダウンストリーム テーブルに自動的に適用されます。
- 新しい列は、ダウンストリーム テーブル スキーマに自動的に追加されます。
mergeSchemaを有効にしないと、値はspark.sql.storeAssignmentPolicy設定に従って処理され、目標カラム タイプに一致するように値が意図的にダウンキャストされます。 割り当てポリシーの動作の詳細については、 「ストア割り当て」を参照してください。
手動タイプ変更の確認
Delta テーブルからストリーミングする場合、型の変更を含む非加算的なスキーマの変更を追跡するためのスキーマ追跡場所を提供できます。スキーマ追跡場所の指定は、Databricks Runtime 18.0 以下では必須であり、Databricks Runtime 18.1 以上ではオプションです。
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
スキーマ追跡場所を指定した後、ストリームは型の変更を検出すると追跡対象のスキーマを展開し、停止します。その時点で、ダウンストリーム テーブルでの型の拡張を有効にしたり、ストリーミング クエリを更新したりするなど、型の変更を処理するために必要なアクションを実行できます。
処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプションを次のように設定します allowSourceColumnTypeChange。
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
チェックポイント ID <checkpoint_id> と Delta Lake ソーステーブルのバージョン <delta_source_table_version> は、ストリームが停止するとエラーメッセージに表示されます。
Lakeflow Spark宣言型パイプライン
LakeFlow Spark宣言型パイプラインのタイプ拡張をパイプライン レベルまたは個々のテーブルで有効にできます。 型の拡張により、ストリーミング テーブルを完全に更新することなく、パイプラインの実行中に列の型を自動的に拡張できます。 マテリアライズドビューでの型の変更は常に完全な再計算をトリガーし、型の変更がソース テーブルに適用されると、そのテーブルに依存するマテリアライズドビューは新しい型を反映するために完全な再計算を必要とします。
パイプライン全体の型拡張を有効にする
パイプライン内のすべてのテーブルに対して型の拡張を有効にするには、パイプライン構成pipelines.enableTypeWideningを設定します。
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
特定のテーブルの型拡張を有効にする
テーブルプロパティdelta.enableTypeWideningを設定することで、個々のテーブルの型拡張を有効にすることもできます。
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
下流リーダーとの互換性
型の拡張が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。パイプラインで型の拡張が有効になっているテーブルを、Databricks Runtime 14.3 以前のリーダーで読み取り可能にするには、次のいずれかを行う必要があります。
- プロパティ
delta.enableTypeWidening/pipelines.enableTypeWideningを削除するか false に設定して型の拡張を無効にし、テーブルの完全な更新をトリガーします。 - テーブルで互換Modeを有効にします。
Delta Sharing
Delta Sharing での型の拡大のサポートは、Databricks Runtime 16.1 以降で使用できます。
タイプ拡大拡大を有効にした Delta Lake テーブルの共有は、 Databricks-to-Databricks Delta Sharingでサポートされています。 プロバイダーと受信者は、Databricks Runtime 16.1 以降を使用している必要があります。
を使用してタイプ拡大が有効になっている テーブルからチェンジデータフィードを読み取るには、応答形式をDelta LakeDelta Sharing に設定する必要があります。delta
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
タイプ変更間でのチェンジデータフィードの読み取りはサポートされていません。 代わりに、オペレーションを 2 つの別々の読み取りに分割し、1 つはタイプ変更を含むテーブルバージョンで終了し、もう 1 つはタイプ変更を含むバージョンで開始する必要があります。
制限
Apache Iceberg の互換性
Apache Iceberg、型の拡大に適用されるすべての型の変更がサポートされているわけではありません。スキーマの進化Icebergを参照してください。特に、Databricks では次の型の変更はサポートされていません。
byte、short、int、decimal``longまたはdouble- 小数点以下のスケールの増加
date-timestampNTZ
Delta Lake テーブルで Iceberg 互換性のある UniForm を有効にすると、これらの型変更のいずれかを適用するとエラーが発生します。Iceberg クライアントで Delta テーブルを読み取るを参照してください。
これらのサポートされていない型の変更のいずれかを Delta Lake テーブルに適用する場合、次の 2 つのオプションがあります。
-
Iceberg メタデータの再生成: 型拡張テーブル機能を使用せずに Iceberg メタデータを再生成するには、次のコマンドを使用します。
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')これにより、互換性のないタイプの変更を適用した後でも、均一な互換性を維持できます。
-
型拡張テーブル機能を削除します。 「型拡張テーブル機能を無効にする」を参照してください。
型依存関数
一部の SQL 関数は、入力データ型に応じて結果を返します。たとえば、 hash関数は、引数の型が異なる場合、同じ論理値に対して異なるハッシュ値を返します。 hash(1::INT) hash(1::BIGINT)とは異なる結果を返します。
その他の型依存関数は次のとおりです: xxhash64 、 bit_get 、 bit_reverse 、 typeof 。
これらの関数を使用するクエリで安定した結果を得るには、値を目的の型に明示的にキャストします。
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
その他の制限事項
- 型が変更された Delta Lake テーブルからストリーミングする場合、SQL を使用してスキーマ追跡の場所を指定することはできません。
- Delta Sharing を使用して、型の拡張が有効になっているテーブルを、Databricks 以外のコンシューマーと共有することはできません。