メインコンテンツまでスキップ

一般的なデータ読み込みパターン

Auto Loader は、多くの一般的なデータ取り込みタスクを簡素化します。 このクイック リファレンスでは、いくつかの一般的なパターンの例を提供します。

クラウドオブジェクトストレージからバリアントとしてデータを取り込む

Auto Loader 、サポートされているファイルからすべてのデータをロードできます ソース ターゲットテーブルの 1 つの VARIANT 列として。 VARIANTスキーマと型の変更に柔軟に対応し、大文字と小文字の区別とデータソースに存在するNULL値を維持するため、このパターンはほとんどのインジェスト シナリオに対して堅牢です。詳細は、 クラウド・オブジェクト・ストレージからバリアントとしてデータを取り込むを参照してください

glob パターンを使用したディレクトリまたはファイルのフィルタリング

glob パターンは、パスに指定されている場合、ディレクトリとファイルをフィルタリングするために使用できます。

パターン

説明

?

任意の 1 文字と一致します

*

0 個以上の文字に一致します

[abc]

文字セット {a,b,c} の 1 文字に一致します。

[a-z]

文字範囲 {a...z}.

[^a]

文字セットまたは範囲 {a} にない 1 つの文字に一致します。 ^文字は、左括弧のすぐ右側に配置する必要があることに注意してください。

{ab,cd}

文字列セット {ab, cd} の文字列と一致します。

{ab,c{de, fh}}

文字列セット {ab, cde, cfh} の文字列に一致します。

プレフィックス パターンを指定するには、次のように path を使用します。

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")

接尾辞パターンを明示的に指定するには、オプションpathGlobFilterを使用する必要があります。pathはプレフィックスフィルタのみを提供します。例えば、異なる拡張子を持つファイルを含むディレクトリから、 pngファイルだけを解析したい場合は、次のようにします。

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
注記

Auto Loader のデフォルト グロビング動作は、他の Spark ファイル ソースのデフォルト動作とは異なります。読み取りに [.option("cloudFiles.useStrictGlobber", "true")] を追加して、ファイル ソースに対するデフォルトの Spark 動作に一致するグロビングを使用します。 グロビングの詳細については、次の表を参照してください。

パターン

ファイルパス

デフォルトのglobber

厳格なglobber

/a/b

/a/b/c/file.txt

あり

あり

/a/b

/a/b_dir/c/file.txt

いいえ

いいえ

/a/b

/a/b.txt

いいえ

いいえ

/a/b/

/a/b.txt

いいえ

いいえ

/a/*/c/

/a/b/c/file.txt

あり

あり

/a/*/c/

/a/b/c/d/file.txt

あり

あり

/a/*/c/

/a/b/x/y/c/file.txt

あり

いいえ

/a/*/c

/a/b/c_file.txt

あり

いいえ

/a/*/c/

/a/b/c_file.txt

あり

いいえ

/a/*/c/

/a/*/cookie/file.txt

あり

いいえ

/a/b*

/a/b.txt

あり

あり

/a/b*

/a/b/file.txt

あり

あり

/a/ {0.txt ,1.txt}

/a/0.txt

あり

あり

/a/*/{0.txt,1.txt}

/a/0.txt

いいえ

いいえ

/a/b/[cde-h]/i/

/a/b/c/i/file.txt

あり

あり

簡単なETLを有効にする

データを失うことなくデータを Delta Lake に取り込む簡単な方法は、次のパターンを使用し、 Auto Loaderでスキーマ推論を有効にすることです。 Databricks では、ソース データのスキーマが変更されたときにストリームを自動的に再起動するために、Databricks ジョブで次のコードを実行することをお勧めします。 デフォルトでは、スキーマは文字列型として推論され、解析エラー(すべてが文字列として残っている場合は何も発生しないはずです)は _rescued_dataに送られ、新しい列はストリームを失敗させ、スキーマを進化させます。

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

適切に構造化されたデータのデータ損失を防止

スキーマはわかっているが、予期しないデータを取得したい場合は、Databricks はrescuedDataColumnの使用を推奨しています。

Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

スキーマに一致しない新しいフィールドが導入された場合にストリームの処理を停止する場合は、以下を追加できます。

Python
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

柔軟な半構造化データパイプラインを実現

ベンダーからデータを受け取る際に、ベンダーが提供する情報に新しい列が追加される場合、それがいつ行われたのか正確に把握できない場合や、データパイプラインを更新するだけのリソースがない場合があります。スキーマ進化を活用してストリームを再開し、Auto Loaderに推論されたスキーマを自動的に更新させることができるようになりました。ベンダーが提供する一部の「スキーマレス」フィールドについては、 schemaHints利用することもできます。

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

ネストされた JSON データの変換

Auto Loader は最上位の JSON 列を文字列として推論するため、さらに変換が必要なネストされた JSON オブジェクトが残る可能性があります。半構造化データ・アクセス・APIを使用して、複雑なJSON・コンテンツをさらに変換できます。

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...
"tags:page.id::int", # extracts {"tags":{"page":{"id":... and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)

ネストされたJSONデータを推論する

ネストされたデータがある場合は、 cloudFiles.inferColumnTypes オプションを使用して、データおよびその他の列タイプのネストされた構造を推測できます。

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")

ヘッダーなしでCSVファイルをロード

以下の例は、Auto Loader を使用してヘッダーなしの CSV ファイルを読み込む方法を示しています。指定されたスキーマに一致しないデータを取得するには、 rescuedDataColumnを使用してください。

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # ensure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)

ヘッダー付きのCSVファイルにスキーマを強制する

以下の例は、ヘッダーを含むCSVファイルにスキーマを適用する方法を示しています。指定されたスキーマに一致しないデータを取得するには、 rescuedDataColumnを使用してください。

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)

ML のために画像またはバイナリ データを Delta Lake に取り込む

データが Delta Lake に格納されたら、データに対して分散推論を実行できます。 Pandas UDFを使用した分散推論の実行を参照してください。

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

Lakeflow Spark宣言型パイプラインのAuto Loader構文

LakeFlow Spark宣言型パイプラインは、 Auto Loader用にわずかに変更されたPython構文を提供し、 Auto Loader用のSQLサポートを追加します。 次の例では、 Auto Loaderを使用して、 Wanderbricks のサンプル旅行予約データセットを使用してJSONファイルからデータセットを作成します。

Python
@dp.table
def booking_updates():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
)

@dp.table
def reviews():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews")
)

サポートされている形式オプションを使用できます Auto Loader。 read_filesのオプションは、キーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。

SQL
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)

以下の例は、列型の推論を有効にした状態で複数行のJSONファイルを読み込む例です。

SQL
CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true,
inferColumnTypes => true
)

schemaを使用して、形式を手動で指定できます。スキーマ推論をサポートしていない形式のschemaを指定する必要があります。

Python
@dp.table
def booking_updates_raw():
return (
spark.readStream.format("cloudFiles")
.schema("booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
)
注記

Lakeflow Spark宣言型パイプラインは、 Auto Loader使用してファイルを読み取るときに、スキーマとチェックポイント ディレクトリを自動的に構成および管理します。 ただし、これらのディレクトリのいずれかを手動で構成した場合、完全更新を実行しても、構成されたディレクトリの内容は影響を受けません。Databricks では、処理中に予期しない副作用を回避するために、自動的に構成されたディレクトリを使用することをお勧めします。

次のステップ