Auto Loaderによる自動タイプ幅拡張
プレビュー
この機能は、Databricks Runtime 16.4以降でパブリックプレビュー版として提供されています。
Auto Loaderは、クラウドストレージに新しいデータファイルが到着するたびに、それを段階的に効率的に処理します。また、複雑なスキーマ変更を自動的に処理することで、パイプラインのメンテナンス作業を軽減します。例えば、Auto Loaderを設定してロードされたデータのスキーマを自動的に検出するようにすれば、データスキーマを明示的に宣言することなくテーブルを初期化できます。また、新しい列が追加されるにつれてテーブルスキーマを進化させることもできるため、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。Auto Loader 、復元されたデータ列内の予期しないデータ(例えば、データ型の違いによるもの)も復元できるため、データ損失を防ぐのに役立ちます。
ただし、復元されたデータ列では、データ型の変更を手動で処理する必要があります。
これらのデータ型の変更の一部を自動的に処理するには、 Auto Loaderの型拡張機能を使用します。 Delta Lakeは、データの書き換えやユーザーの介入を必要とせずに、さまざまなデータ型の拡張変更をサポートするようになりました。Delta Lake型湖沼の拡大を参照のこと。 スキーマ進化の新しいモードaddNewColumnsWithTypeWidening 、互換性のあるデータ型の変更に基づいてスキーマを自動的に進化させます。
intをlongに、 floatをdoubleに拡張するなど、プリミティブ型を拡張できます。Auto Loaderでは、スキーマ進化をサポートするすべてのファイル形式で型拡張が利用可能です。これには、テキスト形式(JSON、CSV、XMLなど)とバイナリ形式(Avro、Parquetなど)が含まれます。既存のスキーマ進化モード( addNewColumns 、 rescue 、 failOnNewColumns 、 noneなど)のスキーマ進化動作に変更はありません。
サポートされている型変更
以下の型変更がサポートされています。
ソースタイプ | サポートされている拡張型 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
数値型をdecimalに拡張する場合、Auto Loader は開始精度以上の精度でdecimalに拡張します。スケールを大きくすると、全体の精度もそれに応じて向上します。
Integer型の初期精度は以下のとおりです。
Type | 開始精度 |
|---|---|
|
|
|
|
|
|
|
|
例えば、列の現在の型がintで、その列の型がdecimal(5, 2)ファイルが読み込まれた場合、Auto Loader はその列の型をdecimal(12, 2)に拡張します。
前提条件
Auto Loaderでタイプ幅調整機能を使用するには、以下の要件を満たす必要があります。
- Databricks Runtime 16.4以降を使用してください。
- 書き込み先が Delta Lake テーブルの場合は、以下のいずれかの方法を使用して Delta Lake テーブルの型拡張を有効にします。
-
既存のテーブルを使用する場合:
SQLALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true') -
型拡張を有効にして新しいテーブルを作成する場合:
SQLCREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
-
Delta Lakeテーブルにおける型の拡大に関する詳細については、 「型の拡大」を参照してください。
スキーマ進化による型拡張を有効にする
Auto Loaderで型拡張を使用するには、スキーマ進化時にaddNewColumnsWithTypeWideningを指定します。Auto Loaderは、データの処理中に新しい列の追加やデータ型の変更を検出します。
- Python
- Scala
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(availableNow=True)
.toTable("table_name")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(Trigger.AvailableNow())
.toTable("table_name")
Auto Loader が新しい列または型拡張でサポートされている型の変更を検出すると、ストリームはUnknownFieldExceptionで停止します。ストリームがこのエラーを発生させる前に、Auto Loader は最新のマイクロバッチデータに対してスキーマ推論を実行し、既存の列の幅を広げたり、新しい列をスキーマの末尾にマージしたりすることで、スキーマの場所を最新のスキーマで更新します。
データ型変更時のスキーマ進化動作
次の内容のCSVを取り込むと、Auto LoaderはスキーマをSTRUCT<id INT, name STRING, _rescued_data STRING>と推測します。
id, name
1, John
2, Mary
対象となるテーブルは以下のようになります。
| ID | 名前 | _rescued_data | | -- | ---- | --------------- | | 1 | ジョン | NULL | | 2 | メアリー | NULL |
次に、 id列の値がINTタイプよりも幅が広い別のCSVファイルを取り込みます。
id, name, age
2147483648, Bob, 25
次の表は、Auto Loaderにおけるさまざまなスキーマ進化モードの動作と出力について説明しています。
モード | サポートされている拡張可能なデータ型の変更に関する動作 |
|---|---|
| データ型は変化せず、データ型の変更によってストリームが失敗することもありません。型が一致しない値を持つ列は |
| スキーマは進化せず、スキーマの変更によってストリームが失敗することもありません。型が一致しない値を持つ列は |
| データ型は変化せず、データ型の変更によってストリームが失敗することもありません。型が一致しない値を持つ列は |
| スキーマは進化せず、新しい列は無視され、 |
| ストリームが失敗しました。スキーマに新しい列が追加され、サポートされるデータ型の変更範囲が拡大されます。サポートされていないデータ型の変更(例: |
結果例
以下の表は、2番目のCSVファイルを取り込んだ後の、各スキーマ進化モードにおける推論されたスキーマと値を示しています。
モード | 推論されたスキーマと値 | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| スキーマ : 値 :
| ||||||||||||||||
| スキーマ : 値 :
| ||||||||||||||||
| スキーマ : 値 :
| ||||||||||||||||
| スキーマ : 値 :
| ||||||||||||||||
| スキーマ : 値 :
|
制限事項
addNewColumnsWithTypeWideningを使用する場合、オプションprefersDecimalfalseに設定することはできません。addNewColumnsWithTypeWideningが指定されている場合、prefersDecimalのデフォルト値はtrueです。datetimestampNTZへの拡張はParquetファイルでのみサポートされています。