メインコンテンツまでスキップ

Auto Loaderによる自動タイプ幅拡張

備考

プレビュー

この機能は、Databricks Runtime 16.4以降でパブリックプレビュー版として提供されています。

Auto Loaderは、クラウドストレージに新しいデータファイルが到着するたびに、それを段階的に効率的に処理します。また、複雑なスキーマ変更を自動的に処理することで、パイプラインのメンテナンス作業を軽減します。例えば、Auto Loaderを設定してロードされたデータのスキーマを自動的に検出するようにすれば、データスキーマを明示的に宣言することなくテーブルを初期化できます。また、新しい列が追加されるにつれてテーブルスキーマを進化させることもできるため、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。Auto Loader 、復元されたデータ列内の予期しないデータ(例えば、データ型の違いによるもの)も復元できるため、データ損失を防ぐのに役立ちます。

ただし、復元されたデータ列では、データ型の変更を手動で処理する必要があります。

これらのデータ型の変更の一部を自動的に処理するには、 Auto Loaderの型拡張機能を使用します。 Delta Lakeは、データの書き換えやユーザーの介入を必要とせずに、さまざまなデータ型の拡張変更をサポートするようになりました。Delta Lake型湖沼の拡大を参照のこと。 スキーマ進化の新しいモードaddNewColumnsWithTypeWidening 、互換性のあるデータ型の変更に基づいてスキーマを自動的に進化させます。

intlongに、 floatdoubleに拡張するなど、プリミティブ型を拡張できます。Auto Loaderでは、スキーマ進化をサポートするすべてのファイル形式で型拡張が利用可能です。これには、テキスト形式(JSON、CSV、XMLなど)とバイナリ形式(Avro、Parquetなど)が含まれます。既存のスキーマ進化モード( addNewColumnsrescuefailOnNewColumnsnoneなど)のスキーマ進化動作に変更はありません。

サポートされている型変更

以下の型変更がサポートされています。

ソースタイプ

サポートされている拡張型

byte

shortintlongdecimaldouble

short

intlongdecimaldouble

int

longdecimaldouble

long

decimal

float

double

decimal

decimal の、より高い精度とスケール

date

timestampNTZ (Parquetファイルのみ対応)

数値型をdecimalに拡張する場合、Auto Loader は開始精度以上の精度でdecimalに拡張します。スケールを大きくすると、全体の精度もそれに応じて向上します。

Integer型の初期精度は以下のとおりです。

Type

開始精度

byte

10

short

10

int

10

long

20

例えば、列の現在の型がintで、その列の型がdecimal(5, 2)ファイルが読み込まれた場合、Auto Loader はその列の型をdecimal(12, 2)に拡張します。

前提条件

Auto Loaderでタイプ幅調整機能を使用するには、以下の要件を満たす必要があります。

  • Databricks Runtime 16.4以降を使用してください。
  • 書き込み先が Delta Lake テーブルの場合は、以下のいずれかの方法を使用して Delta Lake テーブルの型拡張を有効にします。
    • 既存のテーブルを使用する場合:

      SQL
      ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
    • 型拡張を有効にして新しいテーブルを作成する場合:

      SQL
      CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

Delta Lakeテーブルにおける型の拡大に関する詳細については、 「型の拡大」を参照してください。

スキーマ進化による型拡張を有効にする

Auto Loaderで型拡張を使用するには、スキーマ進化時にaddNewColumnsWithTypeWideningを指定します。Auto Loaderは、データの処理中に新しい列の追加やデータ型の変更を検出します。

Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(availableNow=True)
.toTable("table_name")
)

Auto Loader が新しい列または型拡張でサポートされている型の変更を検出すると、ストリームはUnknownFieldExceptionで停止します。ストリームがこのエラーを発生させる前に、Auto Loader は最新のマイクロバッチデータに対してスキーマ推論を実行し、既存の列の幅を広げたり、新しい列をスキーマの末尾にマージしたりすることで、スキーマの場所を最新のスキーマで更新します。

データ型変更時のスキーマ進化動作

次の内容のCSVを取り込むと、Auto LoaderはスキーマをSTRUCT<id INT, name STRING, _rescued_data STRING>と推測します。

CSV
id, name
1, John
2, Mary

対象となるテーブルは以下のようになります。

| ID | 名前 | _rescued_data | | -- | ---- | --------------- | | 1 | ジョン | NULL | | 2 | メアリー | NULL |

次に、 id列の値がINTタイプよりも幅が広い別のCSVファイルを取り込みます。

CSV
id, name, age
2147483648, Bob, 25

次の表は、Auto Loaderにおけるさまざまなスキーマ進化モードの動作と出力について説明しています。

モード

サポートされている拡張可能なデータ型の変更に関する動作

addNewColumns (デフォルト)

データ型は変化せず、データ型の変更によってストリームが失敗することもありません。型が一致しない値を持つ列はNULLに設定され、不一致の値は復元されたデータ列に追加されます。新しい列ではストリームが失敗します。

rescue

スキーマは進化せず、スキーマの変更によってストリームが失敗することもありません。型が一致しない値を持つ列はNULLに設定され、不一致の値は復元されたデータ列に追加されます。

failOnNewColumns

データ型は変化せず、データ型の変更によってストリームが失敗することもありません。型が一致しない値を持つ列はNULLに設定され、不一致の値は復元されたデータ列に追加されます。スキーマを進化させずに新しい列を追加すると、ストリームが失敗します。

none

スキーマは進化せず、新しい列は無視され、rescuedDataColumnオプションが設定されていない限りデータはレスキューされません。スキーマの変更によってストリームが失敗することはありません。

addNewColumnsWithTypeWidening

ストリームが失敗しました。スキーマに新しい列が追加され、サポートされるデータ型の変更範囲が拡大されます。サポートされていないデータ型の変更(例: intからstringへの変更)は、復元されたデータ列に追加されます。

結果例

以下の表は、2番目のCSVファイルを取り込んだ後の、各スキーマ進化モードにおける推論されたスキーマと値を示しています。

モード

推論されたスキーマと値

addNewColumns

スキーマ : id: INTname: STRINGage: INT_rescued_data: STRING

id

name

_rescued_data

1

ジョン

NULL

NULL

2

メアリー

NULL

NULL

NULL

Bob

25

{"id": 2147483648}

rescue

スキーマ : id: INTname: STRING_rescued_data: STRING

id

name

_rescued_data

1

ジョン

NULL

2

メアリー

NULL

NULL

Bob

{"age": 25, "id": 2147483648}

failOnNewColumns

スキーマ : id: INTname: STRING_rescued_data: STRING

id

name

_rescued_data

1

ジョン

NULL

2

メアリー

NULL

NULL

Bob

{"id": 2147483648}

none

スキーマ : id: INTname: STRING

id

name

1

ジョン

2

メアリー

NULL

Bob

addNewColumnsWithTypeWidening

スキーマ : id: BIGINTname: STRINGage: INT_rescued_data: STRING

id

name

_rescued_data

1

ジョン

NULL

NULL

2

メアリー

NULL

NULL

2147483648

Bob

25

NULL

制限事項

  • addNewColumnsWithTypeWideningを使用する場合、オプションprefersDecimal falseに設定することはできません。addNewColumnsWithTypeWideningが指定されている場合、 prefersDecimalのデフォルト値はtrueです。
  • date timestampNTZへの拡張はParquetファイルでのみサポートされています。