メインコンテンツまでスキップ

大量の非構造化データを扱う

このページでは、 Unity Catalogボリュームを使用して非構造化データファイルを保存、クエリ、および処理する方法を説明します。 ファイルのアップロード、メタデータのクエリ、AI 機能を使用したファイルの処理、アクセス制御の適用、他の組織とのボリュームの共有の方法を学びます。可能な場合は、カタログ エクスプローラー UI を使用してこのチュートリアルを実行するための手順も含まれています。 カタログ エクスプローラー オプションが表示されない場合は、提供されている Python または SQL コマンドを使用します。

ボリュームの機能と使用例の完全な概要については、 Unity Catalogボリュームとは何ですか?」を参照してください。

要件

  • Unity Catalog が有効になっている Databricks ワークスペース。
  • CREATE CATALOG メタストアに対する権限。「カタログの作成」を参照してください。カタログを作成できない場合は、管理者にアクセス権を依頼するか、 CREATE SCHEMA権限がある既存のカタログを使用してください。
  • Databricks Runtime 14.3 LTS 以上。
  • AI 機能の場合:サポートされているリージョン内のワークスペース。
  • Delta Sharingの場合: メタストアに対するCREATE SHAREおよびCREATE RECIPIENT権限。 「データと AI アセットを安全に共有する」を参照してください。

ステップ 1: ボリュームを作成する

ファイルを保存するためのカタログ、スキーマ、ボリュームを作成します。ボリューム管理の詳細な手順については、 Unity Catalogボリュームの作成と管理」を参照してください。

ステップ 1.1: カタログとスキーマを作成する

SQL
-- Create a catalog
CREATE CATALOG IF NOT EXISTS unstructured_data_lab;
USE CATALOG unstructured_data_lab;

-- Create a schema
CREATE SCHEMA IF NOT EXISTS raw;
USE SCHEMA raw;

ステップ 1.2: 管理対象ボリュームを作成する

SQL
CREATE VOLUME IF NOT EXISTS files_volume
COMMENT 'Volume for storing unstructured data files';

ステップ 2: ファイルをアップロードする

ボリュームにファイルをアップロードします。包括的なファイル管理の例については、 Unity Catalogボリューム内のファイルの操作」を参照してください。

ステップ 2.1: ファイルをアップロードする

このチュートリアルでは、 databricks-datasetsの例を使用することも、カタログ エクスプローラー UI を使用して独自のファイルをアップロードすることもできます。

注記

Python に精通していなくても、Python コマンドを使用してdatabricks-datasetsからボリュームにファイルをコピーできます。ノートブックでコマンドを実行する手順については、 「ノートブックの管理」を参照してください。

Python
# Upload a single image file
dbutils.fs.cp(
"dbfs:/databricks-datasets/flower_photos/roses/10090824183_d02c613f10_m.jpg",
"/Volumes/unstructured_data_lab/raw/files_volume/rose.jpg"
)

# Upload a single PDF file
dbutils.fs.cp(
"dbfs:/databricks-datasets/COVID/CORD-19/2020-03-13/COVID.DATA.LIC.AGMT.pdf",
"/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf"
)

# Upload a directory
local_dir = "dbfs:/databricks-datasets/samples/data/mllib"
volume_path = "/Volumes/unstructured_data_lab/raw/files_volume/sample_files"

for file_info in dbutils.fs.ls(local_dir):
source = file_info.path
dest = f"{volume_path}/{file_info.name}"
dbutils.fs.cp(source, dest, recurse=True)
print(f"Uploaded: {file_info.name}")

ステップ 2.2: アップロードを確認する

SQL
LIST '/Volumes/unstructured_data_lab/raw/files_volume/';

代替案: %fsマジックコマンドを使用する

%fsマジック コマンドを使用します:

Text
%fs ls /Volumes/unstructured_data_lab/raw/files_volume/

ステップ 3: ファイルのメタデータをクエリする

ファイル情報を照会して、ボリューム内に何が含まれているかを確認します。その他のクエリ パターンについては、 「SQL を使用してボリューム内のファイルを一覧表示およびクエリする」を参照してください。

ステップ 3.1: ファイルのメタデータを表示する

SQL
SELECT
path,
_metadata.file_name,
_metadata.file_size,
_metadata.file_modification_time
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile'
);

ステップ 4: ファイルのクエリと処理

Databricks AI 関数を使用して、ドキュメントからコンテンツを抽出し、画像を分析します。AI関数の機能の完全な概要については、 Databricks AI Functionsを使用してデータにAIを適用する」を参照してください。

注記

AI 機能には、サポートされているリージョン内のワークスペースが必要です。Databricks AI Functionsを使用してデータにAIを適用する」を参照してください。

AI 関数にアクセスできない場合は、代わりに標準の Python ライブラリを使用してください。例については、以下の代替セクションを展開してください。

ステップ 4.1: ドキュメントを解析する

SQL
SELECT
path AS file_path,
ai_parse_document(content, map('version', '2.0')) AS parsed_content
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.pdf'
);

代替案: AI機能なしでPDFを解析する

お住まいの地域で AI 機能が利用できない場合は、Python ライブラリを使用してください。

Python
%pip install PyPDF2

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from PyPDF2 import PdfReader
import io

@udf(returnType=StringType())
def extract_pdf_text(content):
if content is None:
return None
try:
reader = PdfReader(io.BytesIO(content))
return "\n".join(page.extract_text() or "" for page in reader.pages)
except Exception as e:
return f"Error: {str(e)}"

df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.pdf") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/")

result_df = df.withColumn("text_content", extract_pdf_text("content"))
display(result_df.select("path", "text_content"))

ステップ 4.2: 画像を分析する

SQL
SELECT
path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_size < 5000000;

代替案: AI機能なしで画像メタデータを抽出する

AI 機能を使用せずに画像メタデータを抽出するには:

Python
%pip install pillow

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from PIL import Image
import io

image_schema = StructType([
StructField("width", IntegerType()),
StructField("height", IntegerType()),
StructField("format", StringType())
])

@udf(returnType=image_schema)
def get_image_info(content):
if content is None:
return None
try:
img = Image.open(io.BytesIO(content))
return {"width": img.width, "height": img.height, "format": img.format}
except:
return None

df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.{jpg,jpeg,png}") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/")

result_df = df.withColumn("image_info", get_image_info("content"))
display(result_df.select("path", "image_info.*"))

ステップ 4.3: ファイル名でフィルタリングして分析する

この例では、ファイル名に部分文字列「rose」が含まれる画像ファイルをフィルタリングします。

SQL
SELECT
path AS file_path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_name ILIKE '%rose%';

ステップ 4.4: 構造化テーブルを使用してファイルを結合する

この例では、デモンストレーションの目的で行番号を使用してファイルとタクシー旅行をペアリングします。本番運用では、有意義なビジネス キーに参加してください。

SQL
-- This example demonstrates joining file metadata with structured data
-- by pairing files with taxi trips using row numbers
WITH files_with_row AS (
SELECT
path,
SPLIT(path, '/')[SIZE(SPLIT(path, '/')) - 1] AS file_name,
length,
ROW_NUMBER() OVER (ORDER BY path) AS file_row
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile'
)
),
trips_with_row AS (
SELECT
tpep_pickup_datetime,
pickup_zip,
dropoff_zip,
fare_amount,
ROW_NUMBER() OVER (ORDER BY tpep_pickup_datetime) AS trip_row
FROM samples.nyctaxi.trips
WHERE pickup_zip IS NOT NULL
LIMIT 5
)
SELECT
f.path,
f.file_name,
f.length,
t.pickup_zip,
t.dropoff_zip,
t.fare_amount,
t.tpep_pickup_datetime
FROM files_with_row f
INNER JOIN trips_with_row t ON f.file_row = t.trip_row;

ステップ 5: アクセス制御を適用する

ボリューム内のファイルを読み書きできるユーザーを制御します。Unity Catalog での権限の管理の詳細については、 「Unity Catalog での権限の管理」を参照してください。

ステップ 5.1: アクセスを許可する

SQL
-- Replace <user-or-group-name> with your workspace group or user name

-- Grant read access
GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;

-- Grant read and write access
GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;

-- Grant all privileges
GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;

ステップ 5.2: 現在の権限を表示する

SQL
SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume;

ステップ 6: 増分取り込みをセットアップする

Auto Loader を使用すると、ボリュームに新しいファイルが到着すると自動的に処理されます。このパターンは、継続的なデータ取り込みワークフローに役立ちます。その他の取り込みパターンについては、 「一般的なデータ読み込みパターン」を参照してください。

ステップ 6.1: ストリーミングテーブルを作成する

SQL
CREATE OR REFRESH STREAMING TABLE document_ingestion
SCHEDULE EVERY 1 HOUR
AS SELECT
path,
modificationTime,
length,
content,
_metadata,
current_timestamp() AS ingestion_time
FROM STREAM(read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/incoming/',
format => 'binaryFile'
));

ステップ 7: Delta Sharingでファイルを共有する

Delta Sharing を使用して、他の組織のユーザーとボリュームを安全に共有します。共有する前に受信者を作成する必要があります。受信者は、共有データにアクセスできる外部組織またはユーザーを表します。受信者の設定については、 Delta Sharing ( Databricks-to-Databricks共有) のデータ受信者の作成と管理」を参照してください。

ステップ 7.1: 共有を作成して構成する

SQL
-- Create a share
CREATE SHARE IF NOT EXISTS unstructured_data_share
COMMENT 'Document files for partners';

-- Add the volume
ALTER SHARE unstructured_data_share
ADD VOLUME unstructured_data_lab.raw.files_volume;

-- Create a recipient
CREATE RECIPIENT IF NOT EXISTS <partner_org>
USING ID '<recipient-sharing-identifier>';

-- Grant access
GRANT SELECT ON SHARE unstructured_data_share
TO RECIPIENT <partner_org>;

ステップ 7.2: 共有データにアクセスする (受信者として)

SQL
-- View available shares
SHOW SHARES IN PROVIDER <provider_name>;

-- Create a catalog from the share
CREATE CATALOG IF NOT EXISTS shared_documents
FROM SHARE <provider_name>.unstructured_data_share;

-- Query shared files
SELECT * EXCEPT (content), _metadata
FROM read_files(
'/Volumes/shared_documents/raw/files_volume/',
format => 'binaryFile'
)
LIMIT 10;

ステップ 8: ファイルをクリーンアップする

不要になったファイルは削除します。

Python
# Delete a single file
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")

# Delete a directory recursively
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/", recurse=True)

代替案: 標準のPythonを使用する

Python
import os
os.remove("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")

import shutil
shutil.rmtree("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/")

次のステップ

ボリュームについて学び続ける

関連する機能を調べる

SQL関数参照