メインコンテンツまでスキップ

AI Runtimeにデータをロードする

備考

パブリックプレビュー

単一ノードタスク用のAI Runtimeはパブリック プレビュー段階にあります。 マルチ GPU ワークロード用の分散トレーニングAPIベータ版のままです。

このセクションでは、特にMLおよび深層学習(DL)アプリケーション向けに、 AI Runtimeへのデータロードに関する情報について説明します。 Spark Python API を使用したデータの読み込みと変換方法の詳細については、チュートリアルを参照してください。

注記

Unity Catalogが必要です。AI Runtime上のすべてのデータアクセスはUnity Catalogを経由します。 テーブルとボリュームはUnity Catalogに登録され、ユーザーまたはサービスプリンシパルがアクセスできる必要があります。

表形式のデータを読み込む

Spark Connectを使用して、 Deltaテーブルから表形式の機械学習データをロードします。

シングルノードの場合PySparkメソッドtoPandas()を使用してApache Spark DataFrames Pandas DataFramesに変換し、必要に応じてPySparkメソッドto_numpy()を使用してNumPy形式に変換できます。

注記

Spark Connectは、解析と名前解決を実行時に行うため、コードの動作が変わる可能性があります。Spark ConnectとSpark Classicの比較を参照してください。

Spark Connect は、 Spark SQL 、 Spark上のPandas API 、構造化ストリーミング、 MLlib (DataFrame ベース) など、ほとんどのPySpark APIsサポートします。 最新のサポート対象APIsについては、 PySpark APIリファレンスドキュメントを参照してください。

その他の制限については、 「 サーバレス コンピュートの制限 」を参照してください。

ボリュームを使用して大きな Delta テーブルをロードします

toPandas()で変換するには大きすぎる大規模なDeltaテーブルの場合は、データをUnity Catalogボリュームにエクスポートし、 PyTorchまたはHugging Faceを使用して直接ロードします。

Python
# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
Python
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

このアプローチは、トレーニング中のSparkのオーバーヘッドを回避し、シングルGPUと分散型トレーニングの両方のワークフローでうまく機能します。

ボリュームから非構造化データをロードする

画像、音声、テキストファイルなどの非構造化データには、 Unity Catalogボリュームを使用してください。 次の例は、ボリュームからファイルを読み込み、PyTorch Datasetで使用する方法を示しています。

Python
# Read files from a UC volume
volume_path = "/Volumes/catalog/schema/my_volume/images/"

from torch.utils.data import Dataset
import os
from PIL import Image

class ImageDataset(Dataset):
def __init__(self, root_dir):
self.file_list = [os.path.join(root_dir, f) for f in os.listdir(root_dir)]

def __len__(self):
return len(self.file_list)

def __getitem__(self, idx):
img = Image.open(self.file_list[idx])
return img

@distributed デコレータ内でデータをロードする

分散トレーニングにサーバレス GPU API使用する場合は、データ読み込みコードを@distributedデコレータ内に移動します。 データセットのサイズがpickleで許可されている最大サイズを超える可能性があるため、以下に示すように、デコレータ内でデータセットを生成することをお勧めします。

Python
from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
# Load data inside the decorator to avoid pickle serialization issues
dataset = get_dataset(file_path)
...

データ読み込みパフォーマンス

/Workspace また、 /Volumesディレクトリはリモートの Unity Catalog ストレージにホストされています。ファイルがUnity Catalogに保存されている場合、データの読み込み速度は利用可能なネットワーク帯域幅によって制限されます。 複数のエポックでトレーニングを行う場合は、まずデータをローカルにコピーし、具体的には高速なNVMe SSDストレージ上にホストされている/tmpディレクトリにコピーすることをお勧めします。

データセットが大きい場合は、以下の手法でパフォーマンスを向上させることができます。

  • マルチエポックトレーニングのためにデータをローカルにキャッシュします。 データセットを/tmpにコピーして、エポック間でのアクセスを高速化します。

    Python
    import shutil
    shutil.copytree("/Volumes/catalog/schema/volume/dataset", "/tmp/dataset")
  • データ取得を並列化する。 PyTchのDataLoaderを複数のワーカーで使用することで、データ読み込みとGPU計算を並行して実行できます。num_workers少なくとも2に設定してください。パフォーマンスを向上させるには、 num_workers (並列読み取り回数を増やす)またはprefetch_factor (各ワーカーがプリフェッチするアイテム数を増やす)を増やしてください。

    Python
    from torch.utils.data import DataLoader

    loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=2,
    prefetch_factor=2,
    pin_memory=True
    )
  • 大規模な表形式データセットには、Spark Connectを使用してください。 Spark ConnectはほとんどのPySpark APIsサポートし、分散読み取りを効率的に処理します。

ストリーミングデータセット

メモリに収まらないほど大規模なデータセットの場合は、ストリーミング方式を使用します。