Auto Loaderでのスキーマ推論と進化の構成
読み込まれたデータのスキーマを自動的に検出するようにAuto Loaderを構成すると、データスキーマを明示的に宣言せずにテーブルを初期化し、新しい列の導入に応じてテーブルスキーマを進化させることができます。これにより、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。
Auto Loader 、 JSON BLOB 列内の予期しないデータ (たとえば、異なるデータ型のデータ) を「救出」することもできます。救出したデータは、後で半構造化データ アクセスAPIs使用して表示できます。
Auto Loaderスキーマの推論と進化のために次の形式をサポートしています。
ファイル形式 | サポートされているバージョン |
|---|---|
| すべてのバージョン |
| すべてのバージョン |
| Databricks Runtime 14.3 LTS 以降 |
| Databricks Runtime 10.4 LTS 以降 |
| Databricks Runtime 11.3 LTS 以降 |
| サポートされていません |
| 該当なし(固定スキーマ) |
| 該当なし(固定スキーマ) |
スキーマ推論と進化の構文
cloudFiles.schemaLocationオプションのターゲット ディレクトリを指定すると、スキーマの推論と進化が有効になります。checkpointLocationに指定したのと同じディレクトリを使用することもできます。LakeFlow Spark宣言型パイプラインを使用する場合、 Databricksスキーマの場所とその他のチェックポイント情報を自動的に管理します。
ターゲットテーブルに読み込まれているソースデータの場所が複数ある場合、各Auto Loader取り込みワークロードに個別のストリーミングチェックポイントが必要です。
以下の例ではcloudFiles.formatの代わりにparquetを使用しています。他のファイルソースには、csv、avro、またはjsonを使用します。読み取りおよび書き込みに関するその他の設定はすべて各形式のデフォルトの動作と同じままです。
- Python
- Scala
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path-to-target>")
)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path-to-target>")
スキーマ推論 Auto Loader はどのように機能しますか?
最初にデータを読み取るときにスキーマを推測するために、Auto Loaderは、検出した最初の50 GBまたは1000ファイル(最初に制限を超えた方)をサンプリングします。Auto Loaderは、入力データに対するスキーマの変更を経時的に追跡するために、設定されたcloudFiles.schemaLocationのディレクトリ_schemasにスキーマ情報を保存します。
使用されるサンプルのサイズを変更するには、SQL 構成を設定します。
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(バイト数を表す文字列。例:10gb)
そして
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整数)
Auto Loaderスキーマ推論は、タイプの不一致によるスキーマ進化の問題を回避しようとします。 データ型をエンコードしない形式 (JSON、CSV、XML) の場合、Auto Loader はすべての列を文字列として推測します (JSON ファイル内のネストされたフィールドを含む)。型指定されたスキーマを持つ形式 (Parquet および Avro) の場合、Auto Loader はファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。次の表はこの動作をまとめたものです。
ファイル形式 | デフォルトの推論データ型 |
|---|---|
| 文字列 |
| 文字列 |
| 文字列 |
| Avroスキーマでエンコードされた型 |
| Parquetスキーマでエンコードされた型 |
Apache Spark データフレームReader は、スキーマ推論に異なる動作を使用し、サンプル データに基づいて JSON、CSV、および XML ソースの列のデータ型を選択します。 この動作を Auto Loaderで有効にするには、オプション cloudFiles.inferColumnTypes を true に設定します。
CSVデータのスキーマを推論する場合、Auto Loaderファイルにヘッダーが含まれていると想定します。CSV ファイルにヘッダーが含まれていない場合は、 .option("header", "false")のオプションを指定します。 さらに、 Auto Loader はサンプル内のすべてのファイルのスキーマをマージして、グローバル スキーマを作成します。 その後、Auto Loaderヘッダーに従って各ファイルを読み取り、CSVを正しく解析できます。
2 つの Parquet ファイルで列のデータ型が異なる場合、Auto Loader は最も幅の広い型を選択します。schemaHintsを使用してこの選択をオーバーライドできます。スキーマ ヒントを指定すると、Auto Loader は列を指定された型にキャストするのではなく、Parquet リーダーに列を指定された型として読み取るように指示します。不一致がある場合、Auto Loader は、復元されたデータ列にデータを配置して列を復元します。
Auto Loaderスキーマ進化はどのように機能しますか?
Auto Loaderは、データを処理するときに新しい列の追加を検出します。Auto Loaderが新しい列を検出すると、ストリームはUnknownFieldExceptionで停止します。ストリームがこのエラーをスローする前に、Auto Loaderはデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることでスキーマの場所を最新のスキーマで更新します。既存の列のデータ型は変更されません。
DatabricksAuto LoaderではLakeflowジョブ を使用してストリームを構成して、このようなスキーマの変更後に自動的に再起動するようにすることをお勧めします。
Auto Loader 、スキーマ進化の次のモードをサポートしています。これは、 cloudFiles.schemaEvolutionModeオプションで設定します。
Mode | 新しい列の読み取り時の動作 |
|---|---|
| ストリームが失敗します。新しい列がスキーマに追加されます。既存の列ではデータ型が進化しません。 |
| Auto Loaderスキーマを進化させることはなく、スキーマの変更によってストリームが失敗することもありません。 Auto Loader救出されたデータ列にすべての新しい列を記録します。 |
| ストリームが失敗しました。提供されたスキーマを更新するか、問題のあるデータ ファイルを削除しない限り、ストリームは再起動されません。 |
| スキーマは進化せず、新しい列は無視され、 |
addNewColumns スキーマが指定されていない場合は mode がデフォルトですが、スキーマが指定されている場合は mode がデフォルト none 。 addNewColumns ストリームのスキーマが指定されている場合は許可されませんが、スキーマ をスキーマヒントとして提供する場合は機能します。
Auto Loader ではパーティションはどのように機能しますか?
データが Hive スタイルのパーティション分割でレイアウトされている場合、Auto Loader はデータの基盤となるディレクトリ構造からパーティション列を推測しようとします。たとえば、ファイル パスbase_path/event=click/date=2021-04-01/f0.jsonでは、パーティション列としてdateとeventが推論されます。基礎となるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive スタイルのパーティションが含まれていない場合、Auto Loader はパーティション列を無視します。
バイナリファイル(binaryFile)およびtextファイル形式には固定のデータスキーマがありますが、パーティション列推論がサポートされています。Databricksでは、これらのファイル形式にcloudFiles.schemaLocationを設定することをお勧めしています。これにより、潜在的なエラーや情報損失が回避され、Auto Loaderが起動するたびにパーティション列が推論されることがなくなります。
Auto Loaderスキーマ進化のパーティション列を考慮しません。 初期のディレクトリ構造がbase_path/event=click/date=2021-04-01/f0.jsonであり、新しいファイルをbase_path/event=click/date=2021-04-01/hour=01/f1.jsonとして受信し始めた場合、Auto Loader は時間の列を無視します。新しいパーティション列の情報を取得するには、 cloudFiles.partitionColumnsをevent,date,hourに設定します。
cloudFiles.partitionColumnsオプションは、列名をコンマで区切ったリストを受け取ります。Auto Loader 、ディレクトリ構造内にkey=valueペアとして存在する列のみを解析します。
レスキューされたデータ列とは何ですか?
Auto Loader がスキーマを推測すると、Auto Loader は復元されたデータ列を_rescued_dataとしてスキーマに自動的に追加します。rescuedDataColumnオプションを設定することで、列の名前を変更したり、スキーマを提供するときに列を含めたりすることができます。
救出されたデータ列により、Auto Loader はスキーマに一致しない列を削除するのではなく救出するようになります。救出されたデータ列には、次の理由で解析されなかったデータが含まれます。
- 列がスキーマにない
- 型が一致しない
- 大文字小文字が一致しない
復元されたデータ列には、復元された列とレコードのソース ファイル パスを含む JSON BLOB が含まれます。
JSON および CSV パーサーは、レコードを解析するときに 3 つのモードをサポートします: PERMISSIVE 、 DROPMALFORMED 、およびFAILFAST 。rescuedDataColumnと一緒に使用すると、データ型の不一致によって、Auto Loader がDROPMALFORMEDモードでレコードを削除したり、 FAILFASTモードでエラーをスローしたりすることがなくなります。不完全または不正な形式の JSON や CSV など、破損したレコードのみが失敗したりエラーをスローしたりします。JSON または CSV を解析するときにbadRecordsPath使用する場合、Auto Loader はrescuedDataColumnの使用時にデータ型の不一致を不良レコードとして扱いません。Auto Loader 、不完全または不正な形式のJSONまたはCSVレコードのみをbadRecordsPathに保存します。
大文字と小文字を区別する動作を変更する
大文字と小文字の区別が有効になっていない限り、 Auto Loaderスキーマ推論の目的で列abc 、 Abc 、およびABCを同じ列と見なします。 Auto Loader は、サンプリングされたデータに基づいてケースを任意に選択します。スキーマヒントを使用して、どのケースを使用するかを強制できます。Auto Loader選択を行ってスキーマを推測した後、選択されなかった大文字と小文字のバリエーションがスキーマと一致しているかどうかは考慮されません。
救出されたデータ列が有効になっている場合、 Auto Loaderスキーマとは異なる大文字と小文字で名前が付けられたフィールドを_rescued_data列にロードします。 この動作を変更するには、 readerCaseSensitiveオプションを false に設定します。この場合、Auto Loader は大文字と小文字を区別せずにデータを読み取ります。
スキーマの推論をスキーマのヒントで無効にする
スキーマヒントを使用すると、推論されたスキーマに対して、自分が知っていて期待しているスキーマ情報を適用できます。列が特定のデータ型であることが分かっている場合や、より一般的なデータ型(例えば、integerの代わりにdoubleなど)を選択したい場合は、SQLスキーマ指定構文を使用して、列のデータ型に関する任意の数のヒントを以下のような文字列として提供することができます:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
サポートされているデータ型の一覧については、 「言語マッピング」を参照してください。
ストリームの先頭に列が存在しない場合は、スキーマヒントを使用して、その列を推論されたスキーマに追加することもできます。
次の例は、推論されたスキーマとスキーマヒントを適用した結果を示しています。
推論されたスキーマ:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
以下のスキーマヒントを指定することで
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
以下が得られます
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
配列とマップのスキーマ ヒントのサポートは、 Databricks Runtime 9.1 LTS 以降で使用できます。
次の例は、複雑なデータ型を持つ推論されたスキーマと、スキーマ ヒントを適用した結果を示しています。
推論されたスキーマ:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
以下のスキーマヒントを指定することで
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
以下が得られます
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Auto Loaderスキーマを指定 しない 場合にのみスキーマヒントを使用します。 cloudFiles.inferColumnTypesが有効か無効かに関係なく、スキーマヒントを使用できます。