一般的なデータ読み込みパターン
Auto Loader は、多くの一般的なデータ取り込みタスクを簡素化します。 このクイック リファレンスでは、いくつかの一般的なパターンの例を提供します。
glob パターンを使用したディレクトリまたはファイルのフィルタリング
glob パターンは、パスに指定されている場合、ディレクトリとファイルをフィルタリングするために使用できます。
パターン | 説明 |
---|---|
| 任意の 1 文字と一致します |
| 0 個以上の文字に一致します |
| 文字セット {a,b,c} の 1 文字に一致します。 |
| 文字範囲 {a...z}. |
| 文字セットまたは範囲 {a} にない 1 つの文字に一致します。 |
| 文字列セット {ab, cd} の文字列と一致します。 |
| 文字列セット {ab, cde, cfh} の文字列に一致します。 |
プレフィックス パターンを指定するには、次のように path
を使用します。
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
サフィックス パターンを明示的に指定するには、オプション pathGlobFilter
を使用する必要があります。 path
はプレフィックス フィルターのみを提供します。
たとえば、異なるサフィックスを持つファイルを含むディレクトリ内の png
ファイルのみを解析する場合は、次のようにします。
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Auto Loader のデフォルト グロビング動作は、他の Spark ファイル ソースのデフォルト動作とは異なります。読み取りに [.option("cloudFiles.useStrictGlobber", "true")
] を追加して、ファイル ソースに対するデフォルトの Spark 動作に一致するグロビングを使用します。 グロビングの詳細については、次の表を参照してください。
パターン | ファイルパス | デフォルトのglobber | 厳格なglobber |
---|---|---|---|
/a/b | /a/b/c/file.txt | あり | あり |
/a/b | /a/b_dir/c/file.txt | いいえ | いいえ |
/a/b | /a/b.txt | いいえ | いいえ |
/a/b/ | /a/b.txt | いいえ | いいえ |
/a/*/c/ | /a/b/c/file.txt | あり | あり |
/a/*/c/ | /a/b/c/d/file.txt | あり | あり |
/a/*/c/ | /a/b/x/y/c/file.txt | あり | いいえ |
/a/*/c | /a/b/c_file.txt | あり | いいえ |
/a/*/c/ | /a/b/c_file.txt | あり | いいえ |
/a/*/c/ | /a/*/cookie/file.txt | あり | いいえ |
/a/b* | /a/b.txt | あり | あり |
/a/b* | /a/b/file.txt | あり | あり |
/a/ {0.txt ,1.txt} | /a/0.txt | あり | あり |
/a/*/{0.txt,1.txt} | /a/0.txt | いいえ | いいえ |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | あり | あり |
簡単なETLを有効にする
データを失うことなくデータを Delta Lake に取り込む簡単な方法は、次のパターンを使用し、 Auto Loaderでスキーマ推論を有効にすることです。 Databricks では、ソース データのスキーマが変更されたときにストリームを自動的に再起動するために、Databricks ジョブで次のコードを実行することをお勧めします。 デフォルトでは、スキーマは文字列型として推論され、解析エラー(すべてが文字列として残っている場合は何も発生しないはずです)は _rescued_data
に送られ、新しい列はストリームを失敗させ、スキーマを進化させます。
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
適切に構造化されたデータのデータ損失を防止
スキーマはわかっているが、予期しないデータを受け取るたびに知りたい場合、Databricks では rescuedDataColumn
を使用することをお勧めします。
- Python
- Scala
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
スキーマに一致しない新しいフィールドが導入された場合にストリームの処理を停止する場合は、以下を追加できます。
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
柔軟な半構造化データパイプラインを実現
ベンダーからデータを受け取っている場合、ベンダーが提供する情報に新しい列が導入されている場合、そのベンダーがいつデータパイプラインを更新するかを正確に把握していないか、データパイプラインを更新する帯域幅がない可能性があります。 スキーマ進化を活用してストリームを再起動し、推論されたスキーマを自動的に更新 Auto Loader できるようになりました。 また、ベンダーが提供する可能性のある一部の「スキーマレス」フィールドの schemaHints
を活用することもできます。
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
ネストされた JSON データの変換
Auto Loader は最上位の JSON 列を文字列として推論するため、さらに変換が必要なネストされた JSON オブジェクトが残る可能性があります。半構造化データ・アクセス・APIsを使用して、複雑なJSON・コンテンツをさらに変換できます。
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
ネストされたJSONデータを推論する
ネストされたデータがある場合は、 cloudFiles.inferColumnTypes
オプションを使用して、データおよびその他の列タイプのネストされた構造を推測できます。
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
ヘッダーなしでCSVファイルをロード
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
ヘッダー付きのCSVファイルにスキーマを強制する
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
ML のために画像またはバイナリ データを Delta Lake に取り込む
データが Delta Lake に格納されたら、データに対して分散推論を実行できます。 Pandas UDFを使用した分散推論の実行を参照してください。
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
DLT の Auto Loader 構文
DLT は、Python Auto LoaderSQLのサポート のAuto Loader 構文をわずかに変更しました。
次の例では、 Auto Loader を使用して CSV ファイルと JSON ファイルからデータセットを作成します。
- Python
- SQL
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv"
)
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders/",
format => "json")
サポートされている形式オプションを使用できます Auto Loader。 read_files
のオプションは、キーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。
例えば:
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)
以下の例では、ヘッダー付きのタブ区切りCSVファイルからデータを読み込んでいます。
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv",
delimiter => "\t",
header => "true"
)
schema
を使用して、形式を手動で指定できます。スキーマ推論をサポートしていない形式のschema
を指定する必要があります。
- Python
- SQL
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
format => "parquet",
schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)
DLT は、 Auto Loader を使用してファイルを読み取るときに、スキーマとチェックポイント ディレクトリを自動的に構成および管理します。 ただし、これらのディレクトリのいずれかを手動で構成した場合、完全更新を実行しても、構成済みディレクトリの内容には影響しません。 Databricks では、処理中の予期しない副作用を回避するために、自動的に構成されたディレクトリを使用することをお勧めします。