大量の非構造化データを扱う
このページでは、 Unity Catalogボリュームを使用して非構造化データファイルを保存、クエリ、および処理する方法を説明します。 ファイルのアップロード、メタデータのクエリ、AI 機能を使用したファイルの処理、アクセス制御の適用、他の組織とのボリュームの共有の方法を学びます。可能な場合は、カタログ エクスプローラー UI を使用してこのチュートリアルを実行するための手順も含まれています。 カタログ エクスプローラー オプションが表示されない場合は、提供されている Python または SQL コマンドを使用します。
ボリュームの機能と使用例の完全な概要については、 Unity Catalogボリュームとは何ですか?」を参照してください。
要件
- Unity Catalog が有効になっている Databricks ワークスペース。
CREATE CATALOGメタストアに対する権限。「カタログの作成」を参照してください。カタログを作成できない場合は、管理者にアクセス権を依頼するか、CREATE SCHEMA権限がある既存のカタログを使用してください。- Databricks Runtime 14.3 LTS 以上。
- AI 機能の場合:サポートされているリージョン内のワークスペース。
- Delta Sharingの場合: メタストアに対する
CREATE SHAREおよびCREATE RECIPIENT権限。 「データと AI アセットを安全に共有する」を参照してください。
ステップ 1: ボリュームを作成する
ファイルを保存するためのカタログ、スキーマ、ボリュームを作成します。ボリューム管理の詳細な手順については、 Unity Catalogボリュームの作成と管理」を参照してください。
ステップ 1.1: カタログとスキーマを作成する
- SQL
- Python
- Catalog Explorer
-- Create a catalog
CREATE CATALOG IF NOT EXISTS unstructured_data_lab;
USE CATALOG unstructured_data_lab;
-- Create a schema
CREATE SCHEMA IF NOT EXISTS raw;
USE SCHEMA raw;
spark.sql("CREATE CATALOG IF NOT EXISTS unstructured_data_lab")
spark.sql("USE CATALOG unstructured_data_lab")
spark.sql("CREATE SCHEMA IF NOT EXISTS raw")
spark.sql("USE SCHEMA raw")
- クリック
サイドバーの カタログ 。
- [作成] > [カタログの作成] をクリックします。
- カタログ名 として unstructured_data_lab と入力します。
- 作成 をクリックします。
- [カタログの表示] をクリックします。
カタログページ:
- スキーマの作成 をクリックします。
- スキーマ名 として raw と入力します。
- 作成 をクリックします。
ステップ 1.2: 管理対象ボリュームを作成する
- SQL
- Python
- Catalog Explorer
CREATE VOLUME IF NOT EXISTS files_volume
COMMENT 'Volume for storing unstructured data files';
spark.sql("""
CREATE VOLUME IF NOT EXISTS files_volume
COMMENT 'Volume for storing unstructured data files'
""")
スキーマ ページで次の操作を実行します。
- [作成] > [ボリューム] をクリックします。
- ボリューム名 として files_volume と入力します。
- 管理対象ボリューム が選択されていることを確認します。
- 作成 をクリックします。
ステップ 2: ファイルをアップロードする
ボリュームにファイルをアップロードします。包括的なファイル管理の例については、 Unity Catalogボリューム内のファイルの操作」を参照してください。
ステップ 2.1: ファイルをアップロードする
このチュートリアルでは、 databricks-datasetsの例を使用することも、カタログ エクスプローラー UI を使用して独自のファイルをアップロードすることもできます。
Python に精通していなくても、Python コマンドを使用してdatabricks-datasetsからボリュームにファイルをコピーできます。ノートブックでコマンドを実行する手順については、 「ノートブックの管理」を参照してください。
- Python
- Catalog Explorer
# Upload a single image file
dbutils.fs.cp(
"dbfs:/databricks-datasets/flower_photos/roses/10090824183_d02c613f10_m.jpg",
"/Volumes/unstructured_data_lab/raw/files_volume/rose.jpg"
)
# Upload a single PDF file
dbutils.fs.cp(
"dbfs:/databricks-datasets/COVID/CORD-19/2020-03-13/COVID.DATA.LIC.AGMT.pdf",
"/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf"
)
# Upload a directory
local_dir = "dbfs:/databricks-datasets/samples/data/mllib"
volume_path = "/Volumes/unstructured_data_lab/raw/files_volume/sample_files"
for file_info in dbutils.fs.ls(local_dir):
source = file_info.path
dest = f"{volume_path}/{file_info.name}"
dbutils.fs.cp(source, dest, recurse=True)
print(f"Uploaded: {file_info.name}")
Python タブの Python コードは、2 つのファイル (JPG と PDF) と、 .txtファイルと.csvファイルを含むディレクトリをアップロードします。カタログエクスプローラーを使用してファイルをアップロードするには:
- ボリューム ページで、 [このボリュームにアップロード] を クリックします。
- [ファイルのアップロード ] ダイアログの [ファイル] で、 [参照 ] をクリックするか、ファイルをドロップ ゾーンにドラッグ アンド ドロップします。
- [宛先ボリューム] で、前のステップで作成したボリュームが選択されていることを確認します。
ステップ 2.2: アップロードを確認する
- SQL
- Python
- Catalog Explorer
LIST '/Volumes/unstructured_data_lab/raw/files_volume/';
files = dbutils.fs.ls("/Volumes/unstructured_data_lab/raw/files_volume/")
for f in files:
print(f"{f.name}\t{f.size} bytes")
ファイルがアップロードされると、ボリューム ページに表示されます。ファイル名をクリックするとプレビューが表示され、ディレクトリをクリックすると個々のファイルが表示されます。
代替案: %fsマジックコマンドを使用する
%fsマジック コマンドを使用します:
%fs ls /Volumes/unstructured_data_lab/raw/files_volume/
ステップ 3: ファイルのメタデータをクエリする
ファイル情報を照会して、ボリューム内に何が含まれているかを確認します。その他のクエリ パターンについては、 「SQL を使用してボリューム内のファイルを一覧表示およびクエリする」を参照してください。
ステップ 3.1: ファイルのメタデータを表示する
- SQL
- Python
- Catalog Explorer
SELECT
path,
_metadata.file_name,
_metadata.file_size,
_metadata.file_modification_time
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile'
);
df = (
spark.read
.format("binaryFile")
.option("recursiveFileLookup", "true")
.load("/Volumes/unstructured_data_lab/raw/files_volume/")
)
df.select("path", "modificationTime", "length").show(truncate=False)
カタログ エクスプローラーのボリューム ページには、各ファイルの 名前 (拡張子を含む)、 サイズ 、 最終更新 日が表示されます。
ステップ 4: ファイルのクエリと処理
Databricks AI 関数を使用して、ドキュメントからコンテンツを抽出し、画像を分析します。AI関数の機能の完全な概要については、 Databricks AI Functionsを使用してデータにAIを適用する」を参照してください。
AI 機能には、サポートされているリージョン内のワークスペースが必要です。Databricks AI Functionsを使用してデータにAIを適用する」を参照してください。
AI 関数にアクセスできない場合は、代わりに標準の Python ライブラリを使用してください。例については、以下の代替セクションを展開してください。
ステップ 4.1: ドキュメントを解析する
- SQL
- Python
SELECT
path AS file_path,
ai_parse_document(content, map('version', '2.0')) AS parsed_content
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.pdf'
);
result_df = spark.sql("""
SELECT
path AS file_path,
ai_parse_document(content, map('version', '2.0')) AS parsed_content
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.pdf'
)
""")
display(result_df)
代替案: AI機能なしでPDFを解析する
お住まいの地域で AI 機能が利用できない場合は、Python ライブラリを使用してください。
%pip install PyPDF2
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from PyPDF2 import PdfReader
import io
@udf(returnType=StringType())
def extract_pdf_text(content):
if content is None:
return None
try:
reader = PdfReader(io.BytesIO(content))
return "\n".join(page.extract_text() or "" for page in reader.pages)
except Exception as e:
return f"Error: {str(e)}"
df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.pdf") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/")
result_df = df.withColumn("text_content", extract_pdf_text("content"))
display(result_df.select("path", "text_content"))
ステップ 4.2: 画像を分析する
- SQL
- Python
SELECT
path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_size < 5000000;
result_df = spark.sql("""
SELECT
path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_size < 5000000
""")
display(result_df)
代替案: AI機能なしで画像メタデータを抽出する
AI 機能を使用せずに画像メタデータを抽出するには:
%pip install pillow
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from PIL import Image
import io
image_schema = StructType([
StructField("width", IntegerType()),
StructField("height", IntegerType()),
StructField("format", StringType())
])
@udf(returnType=image_schema)
def get_image_info(content):
if content is None:
return None
try:
img = Image.open(io.BytesIO(content))
return {"width": img.width, "height": img.height, "format": img.format}
except:
return None
df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.{jpg,jpeg,png}") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/")
result_df = df.withColumn("image_info", get_image_info("content"))
display(result_df.select("path", "image_info.*"))
ステップ 4.3: ファイル名でフィルタリングして分析する
この例では、ファイル名に部分文字列「rose」が含まれる画像ファイルをフィルタリングします。
- SQL
- Python
SELECT
path AS file_path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_name ILIKE '%rose%';
result_df = spark.sql("""
SELECT
path AS file_path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_name ILIKE '%rose%'
""")
display(result_df)
ステップ 4.4: 構造化テーブルを使用してファイルを結合する
この例では、デモンストレーションの目的で行番号を使用してファイルとタクシー旅行をペアリングします。本番運用では、有意義なビジネス キーに参加してください。
- SQL
- Python
-- This example demonstrates joining file metadata with structured data
-- by pairing files with taxi trips using row numbers
WITH files_with_row AS (
SELECT
path,
SPLIT(path, '/')[SIZE(SPLIT(path, '/')) - 1] AS file_name,
length,
ROW_NUMBER() OVER (ORDER BY path) AS file_row
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile'
)
),
trips_with_row AS (
SELECT
tpep_pickup_datetime,
pickup_zip,
dropoff_zip,
fare_amount,
ROW_NUMBER() OVER (ORDER BY tpep_pickup_datetime) AS trip_row
FROM samples.nyctaxi.trips
WHERE pickup_zip IS NOT NULL
LIMIT 5
)
SELECT
f.path,
f.file_name,
f.length,
t.pickup_zip,
t.dropoff_zip,
t.fare_amount,
t.tpep_pickup_datetime
FROM files_with_row f
INNER JOIN trips_with_row t ON f.file_row = t.trip_row;
from pyspark.sql.functions import col, row_number, element_at, split
from pyspark.sql.window import Window
# Read files and add row numbers
files_df = spark.read.format("binaryFile") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/") \
.withColumn("file_name", element_at(split(col("path"), "/"), -1))
files_with_row = files_df.alias("files") \
.withColumn("file_row", row_number().over(Window.orderBy("path")))
# Get trips and add row numbers
trips_df = spark.table("samples.nyctaxi.trips") \
.filter(col("pickup_zip").isNotNull()) \
.limit(5)
trips_with_row = trips_df.alias("trips") \
.withColumn("trip_row", row_number().over(Window.orderBy("tpep_pickup_datetime")))
# Join on row numbers
result_df = files_with_row \
.join(trips_with_row, col("file_row") == col("trip_row"), "inner") \
.select(
"files.path",
"files.file_name",
"files.length",
"trips.pickup_zip",
"trips.dropoff_zip",
"trips.fare_amount",
"trips.tpep_pickup_datetime"
)
display(result_df)
ステップ 5: アクセス制御を適用する
ボリューム内のファイルを読み書きできるユーザーを制御します。Unity Catalog での権限の管理の詳細については、 「Unity Catalog での権限の管理」を参照してください。
ステップ 5.1: アクセスを許可する
- SQL
- Python
- Catalog Explorer
-- Replace <user-or-group-name> with your workspace group or user name
-- Grant read access
GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;
-- Grant read and write access
GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;
-- Grant all privileges
GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;
# Replace <user-or-group-name> with your workspace group or user name
spark.sql("""
GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`
""")
spark.sql("""
GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`
""")
spark.sql("""
GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`
""")
- ボリューム ページの [ アクセス許可] タブに移動します。
- 付与 をクリックします。
- ユーザーの電子メール アドレスまたはグループの名前を入力します。
- 付与する権限を選択します。
- 確認 をクリックします。
ステップ 5.2: 現在の権限を表示する
- SQL
- Python
- Catalog Explorer
SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume;
display(spark.sql("SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume"))
ボリューム ページの [権限] タブには、ボリュームにアクセスできるユーザーとグループが表示されます。
ステップ 6: 増分取り込みをセットアップする
Auto Loader を使用すると、ボリュームに新しいファイルが到着すると自動的に処理されます。このパターンは、継続的なデータ取り込みワークフローに役立ちます。その他の取り込みパターンについては、 「一般的なデータ読み込みパターン」を参照してください。
ステップ 6.1: ストリーミングテーブルを作成する
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE document_ingestion
SCHEDULE EVERY 1 HOUR
AS SELECT
path,
modificationTime,
length,
content,
_metadata,
current_timestamp() AS ingestion_time
FROM STREAM(read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/incoming/',
format => 'binaryFile'
));
from pyspark.sql.functions import current_timestamp, col
dbutils.fs.mkdirs("/Volumes/unstructured_data_lab/raw/files_volume/incoming/")
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobFilter", "*.pdf") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/incoming/")
df_enriched = df \
.withColumn("ingestion_time", current_timestamp()) \
.withColumn("source_file", col("_metadata.file_path"))
query = df_enriched.writeStream \
.option("checkpointLocation",
"/Volumes/unstructured_data_lab/raw/files_volume/_checkpoints/docs") \
.trigger(availableNow=True) \
.toTable("document_ingestion")
query.awaitTermination()
ステップ 7: Delta Sharingでファイルを共有する
Delta Sharing を使用して、他の組織のユーザーとボリュームを安全に共有します。共有する前に受信者を作成する必要があります。受信者は、共有データにアクセスできる外部組織またはユーザーを表します。受信者の設定については、 Delta Sharing ( Databricks-to-Databricks共有) のデータ受信者の作成と管理」を参照してください。
ステップ 7.1: 共有を作成して構成する
- SQL
- Python
-- Create a share
CREATE SHARE IF NOT EXISTS unstructured_data_share
COMMENT 'Document files for partners';
-- Add the volume
ALTER SHARE unstructured_data_share
ADD VOLUME unstructured_data_lab.raw.files_volume;
-- Create a recipient
CREATE RECIPIENT IF NOT EXISTS <partner_org>
USING ID '<recipient-sharing-identifier>';
-- Grant access
GRANT SELECT ON SHARE unstructured_data_share
TO RECIPIENT <partner_org>;
spark.sql("""
CREATE SHARE IF NOT EXISTS unstructured_data_share
COMMENT 'Document files for partners'
""")
spark.sql("""
ALTER SHARE unstructured_data_share
ADD VOLUME unstructured_data_lab.raw.files_volume
""")
spark.sql("""
CREATE RECIPIENT IF NOT EXISTS <partner_org>
USING ID '<recipient-sharing-identifier>'
""")
spark.sql("""
GRANT SELECT ON SHARE unstructured_data_share
TO RECIPIENT <partner_org>
""")
ステップ 7.2: 共有データにアクセスする (受信者として)
- SQL
- Python
-- View available shares
SHOW SHARES IN PROVIDER <provider_name>;
-- Create a catalog from the share
CREATE CATALOG IF NOT EXISTS shared_documents
FROM SHARE <provider_name>.unstructured_data_share;
-- Query shared files
SELECT * EXCEPT (content), _metadata
FROM read_files(
'/Volumes/shared_documents/raw/files_volume/',
format => 'binaryFile'
)
LIMIT 10;
spark.sql("SHOW SHARES IN PROVIDER <provider_name>").show()
spark.sql("""
CREATE CATALOG IF NOT EXISTS shared_documents
FROM SHARE <provider_name>.unstructured_data_share
""")
df = spark.read.format("binaryFile") \
.load("/Volumes/shared_documents/raw/files_volume/")
df.select("path", "modificationTime", "length").show(10)
ステップ 8: ファイルをクリーンアップする
不要になったファイルは削除します。
- Python
- CLI
# Delete a single file
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")
# Delete a directory recursively
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/", recurse=True)
# Delete a single file
databricks fs rm dbfs:/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf
# Delete a directory recursively
databricks fs rm -r dbfs:/Volumes/unstructured_data_lab/raw/files_volume/sample_files/
代替案: 標準のPythonを使用する
import os
os.remove("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")
import shutil
shutil.rmtree("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/")