型の拡張
型の拡張が有効になっているテーブルでは、基になるデータ ファイルを書き換えることなく、列のデータ型をより広い型に変更できます。 列タイプを手動で変更することも、スキーマ進化を使用して列タイプを進化させることもできます。
タイプワイドニングはDatabricks Runtime 15.4 LTS 以降で利用可能です。Databricks Runtime 15.4 LTS 以降では、型拡張が有効なテーブルを読み取ることができます。
型拡張には Delta Lake が必要です。 すべての Unity Catalog マネージドテーブルは、デフォルトで Delta Lake を使用します。
サポートされるタイプの変更
以下のルールに従って、型を拡張できます。
ソースタイプ | サポートされている拡張型 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| あらゆる種類 |
型の変更は、最上位の列、および構造体、マップ、配列内にネストされたフィールドでサポートされています。
VOID 任意の型に対して、テーブルで型の拡張が有効になっている必要はありません。列VOIDの型を更新する操作は、追加設定なしで成功します。VOID 型の拡大は、Databricks Runtime 18.2以降で使用できます。
Spark は、ある操作によって整数型が decimal または double に昇格され、ダウンストリームの取り込みによって値が整数カラムに書き戻される場合に、デフォルトで値の小数部を切り捨てます。割り当てポリシーの動作の詳細については、ストアの割り当てをご覧ください。
任意の数値型を decimal に変更するとき、合計精度は開始精度以上である必要があります。スケールも増大させる場合、全体の精度は対応する分だけ増加させる必要があります。
byte、short、int のタイプに対する最小ターゲットは decimal(10,0) です。long の最小ターゲットは decimal(20,0) です。
decimal(10,1)のフィールドに小数点以下2桁を追加したい場合は、最小ターゲットはdecimal(12,3)です。
型の拡張を有効にする
既存のテーブルに対して、テーブルプロパティdelta.enableTypeWideningをtrueに設定することにより、型の拡張を有効にすることができます:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
テーブルの作成中に型の拡張を有効にすることもできます。
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
型の拡張を有効にすると、テーブル機能 typeWideningが設定され、リーダーとライターのプロトコルがアップグレードされます。型の拡大が有効になっているテーブルと対話するには、Databricks Runtime 15.4 以降を使用する必要があります。外部クライアントもテーブルと対話する場合は、このテーブル機能をサポートしていることを確認します。Delta Lake 機能の互換性とプロトコルを参照してください。
型の変更を手動で適用する
ALTER COLUMN コマンドを使用して、手動でタイプを変更してください。
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
この操作は、基になるデータファイルを書き換えることなく、テーブルスキーマを更新します。詳細については、ALTER TABLEを参照してください。
自動スキーマ進化での型の拡大
スキーマ進化は、型の拡張と連携して、ターゲットテーブルのデータ型を受信データの型と一致するように更新します。
型の拡張を有効にしないと、スキーマ進化は常に、ターゲットテーブルの列型と一致するようにデータをダウンキャストしようとします。ターゲットテーブルのデータ型を自動的に拡大しない場合は、スキーマ進化が有効になっているワークロードを実行する前に、型の拡大を無効にします。
インジェスト中に列のデータ型を広げるためにスキーマ進化を使用するには、次の条件を満たす必要があります:
- 書き込みコマンドは、自動スキーマ進化が有効になっている状態で実行されます。
- ターゲットテーブルでは、型の拡張が有効になっています。
- ソース列のタイプはターゲットカラムのタイプよりも幅が広いです。
- 型拡張は型の変更に対応しています。
これらの条件をすべて満たさない型が一致しない場合、通常のスキーマ強制ルールに準拠します。See スキーマ強制.
次の例は、一般的な書き込み操作におけるスキーマ進化での型の拡大がどのように機能するかを示しています。
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Auto Loader
プレビュー
Auto Loader の型ワイドニングのサポートは、パブリックプレビューで利用可能です。
Auto Loader は、自動スキーマ進化による型の拡張をサポートしています。型の拡張とスキーマ進化が有効になっているDelta LakeテーブルにAuto Loaderを使用してデータを取り込む場合、列の型は着信するデータに合わせて自動的に拡張されます。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
「Auto Loaderによる自動型拡張」を参照してください。また、ターゲットテーブルは型の拡張が有効になっている必要があります。型の拡張を有効にするを参照してください。
型の拡張テーブル機能を無効にする
プロパティを false に設定することで、有効なテーブルでの偶発的な型拡張を防ぐことができます。
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
この設定により、テーブルへの将来の型変更は防止されますが、型拡張テーブル機能が削除されたり、以前の型変更が元に戻されたりすることはありません。
型拡張テーブル機能を完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
Databricks Runtime 15.4 LTS を使用して型の拡大が有効になっているテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。
型の拡張を削除すると、Databricks は現在のテーブル スキーマに準拠しないすべてのデータ ファイルを書き換えます。Delta Lake テーブル機能の削除およびテーブルプロトコルのダウングレードを参照してください。
Delta Lake テーブルからのストリーミング
構造化ストリーミングでの型拡張のサポートは、Databricks Runtime 16.4 LTS 以降で利用可能です。
型の拡張が有効なDelta Lakeテーブルからストリーミングする際は、ターゲットテーブルでmergeSchemaオプションを使用してスキーマ進化を有効にすることで、ストリーミングクエリの自動型拡張を構成できます。ターゲットテーブルは、型の拡張が有効になっている必要があります。型拡張を有効にするを参照してください。
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
mergeSchema が有効になっており、ターゲットテーブルで型の拡大が有効になっている場合:
- 型変更は、ダウンストリームテーブルに手動介入なしで自動的に適用されます。
- 新しい列が自動的にダウンストリームのテーブルスキーマに追加されます。
mergeSchemaが有効になっていない場合、値はspark.sql.storeAssignmentPolicyの設定に従って処理されます。デフォルトでは、値はターゲットカラム型に一致するようにダウンキャストされます。割り当てポリシーの動作に関する詳細情報については、ストア割り当てを参照してください。
手動の型の変更の確認
Delta Lakeテーブルからストリーミングする際に、型の変更を含む非追加スキーマ変更を追跡するために、スキーマ追跡場所を指定できます。スキーマの追跡場所の指定は、Databricks Runtime 18.0 以下で必須です。 Databricks Runtime 18.1 以降ではオプションです。
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
スキーマ追跡ロケーションを指定した後、型変更が検出されると、ストリームは追跡対象のスキーマを進化させ、その後停止します。その際、型の変更に対応するために必要な対応を行うことができます。例えば、ダウンストリームテーブルで型の拡張を有効にする、あるいはストリーミングクエリを更新するなどの対応です。
処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプション allowSourceColumnTypeChange を設定します。
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
ストリームが停止すると、エラーメッセージにチェックポイントID <checkpoint_id> と Delta Lake ソーステーブルバージョン <delta_source_table_version> が表示されます。
ストリーミング Delta Lake のオプションの完全なリストについては、「Delta Lake」を参照してください。
Lakeflow Spark宣言型パイプライン
Lakeflow Spark宣言型パイプラインで、パイプラインレベルまたは個別のテーブルに対して、型拡大を有効にできます。型の拡張により、ストリーミングテーブルの完全な更新を必要とせずに、パイプラインの実行中に列のデータ型を自動的に広げることができます。マテリアライズドビューでの型変更は常に完全な再計算を引き起こし、ソーステーブルに型変更が適用されると、そのテーブルに依存するマテリアライズドビューは新しい型を反映するために完全な再計算が必要になります。
パイプライン全体で型の拡張を有効にする
パイプライン内のすべてのテーブルの型拡大を有効にするには、パイプライン構成pipelines.enableTypeWideningを設定します。
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
特定のテーブルで型の拡張を有効にする
個々のテーブルで型の拡張を有効にするには、テーブル プロパティ delta.enableTypeWidening を設定します。
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
下流リーダーとの互換性
Databricks Runtime 15.4 LTS 以降では、型拡張が有効なテーブルを読み取ることができます。パイプラインで型拡張が有効になっているテーブルを、Databricks Runtime 14.3以下を使用しているリーダーから読み取り可能にしたい場合は、以下のいずれかの対応が必要です:
- 型の拡張を無効にするには、
delta.enableTypeWidening/pipelines.enableTypeWideningプロパティを削除するか、false に設定することで、テーブルの完全な更新をトリガーします。 - テーブルでCompatibility Modeを有効にします。
OpenSharing
OpenSharingにおける型拡張のサポートは、Databricks Runtime 16.1 以降で利用可能です。
Databricks-to-Databricks OpenSharing では、型拡張が有効な Delta Lake テーブルの共有がサポートされています。プロバイダーと受信者はDatabricks Runtime 16.1 以降である必要があります。
OpenSharing を使用して型拡張が有効になっている Delta Lake テーブルからチェンジデータフィードを読み取るには、応答形式を delta に設定する必要があります。
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
型の変更を伴うチェンジデータフィードの読み取りはサポートされていません。代わりに、操作を2つの別々の読み取りに分割する必要があります。1つはタイプ変更を含むテーブルバージョンで終了し、もう1つはタイプ変更を含むバージョンで開始します。
制限事項:
Apache Iceberg 互換性
Apache Iceberg は、型拡大によってカバーされるすべての型変更をサポートしていません。Iceberg スキーマ進化を参照してください。特に、Databricks は次の種類の変更をサポートしていません。
byte、short、int、longからdecimalまたはdouble- 小数点以下の増加
date-timestampNTZ
Delta Lake テーブルで Iceberg 互換性を備えた UniForm を有効にすると、これらのいずれかのタイプの変更を適用するとエラーになります。Icebergクライアントを使用したDelta Lakeテーブルの読み取りを参照してください。
これらのサポートされていない型の変更のいずれかをDelta Lakeテーブルに適用した場合、2つの選択肢があります:
-
Iceberg メタデータの再生成:型拡張テーブル機能なしで Iceberg メタデータを再生成するには、次のコマンドを使用します。
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')これにより、互換性のない型変更を適用した後でも、互換性を維持できます。
-
型拡張テーブル機能を削除する:型の拡張テーブル機能を無効にするを参照してください。
型依存関数
一部の SQL 関数は、入力データ型に依存する結果を返します。例えば、hash 関数は、引数の型が異なる場合、同じ論理値に対して異なるハッシュ値を返します。hash(1::INT) は hash(1::BIGINT) とは異なる結果を返します。
その他の型依存関数は次のとおりです: xxhash64、bit_get、bit_reverse、typeof。
これらの関数を使用するクエリで安定した結果を得るには、値を目的の型に明示的にキャストしてください。
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
その他の制限事項
- 型変更があるDelta Lakeテーブルからストリーミングする際に、SQLを使用してスキーマ追跡の場所を指定することはできません。
- OpenSharing を使用して、型拡張が有効になっているテーブルを Databricks 以外のコンシューマーと共有することはできません。