AI Runtimeにデータをロードする
パブリックプレビュー
単一ノードタスク用のAI Runtimeはパブリック プレビュー段階にあります。 マルチ GPU ワークロード用の分散トレーニングAPIベータ版のままです。
このセクションでは、特にMLおよび深層学習(DL)アプリケーション向けに、 AI Runtimeへのデータロードに関する情報について説明します。 Spark Python API を使用したデータの読み込みと変換方法の詳細については、チュートリアルを参照してください。
Unity Catalogが必要です。AI Runtime上のすべてのデータアクセスはUnity Catalogを経由します。 テーブルとボリュームはUnity Catalogに登録され、ユーザーまたはサービスプリンシパルがアクセスできる必要があります。
表形式のデータを読み込む
Spark Connectを使用して、 Deltaテーブルから表形式の機械学習データをロードします。
シングルノードの場合PySparkメソッドtoPandas()を使用してApache Spark DataFrames Pandas DataFramesに変換し、必要に応じてPySparkメソッドto_numpy()を使用してNumPy形式に変換できます。
Spark Connectは、解析と名前解決を実行時に行うため、コードの動作が変わる可能性があります。Spark ConnectとSpark Classicの比較を参照してください。
Spark Connect は、 Spark SQL 、 Spark上のPandas API 、構造化ストリーミング、 MLlib (DataFrame ベース) など、ほとんどのPySpark APIsサポートします。 最新のサポート対象APIsについては、 PySpark APIリファレンスドキュメントを参照してください。
その他の制限については、 「 サーバレス コンピュートの制限 」を参照してください。
ボリュームを使用して大きな Delta テーブルをロードします
toPandas()で変換するには大きすぎる大規模なDeltaテーブルの場合は、データをUnity Catalogボリュームにエクスポートし、 PyTorchまたはHugging Faceを使用して直接ロードします。
# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset
dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")
このアプローチは、トレーニング中のSparkのオーバーヘッドを回避し、シングルGPUと分散型トレーニングの両方のワークフローでうまく機能します。
ボリュームから非構造化データをロードする
画像、音声、テキストファイルなどの非構造化データには、 Unity Catalogボリュームを使用してください。 次の例は、ボリュームからファイルを読み込み、PyTorch Datasetで使用する方法を示しています。
# Read files from a UC volume
volume_path = "/Volumes/catalog/schema/my_volume/images/"
from torch.utils.data import Dataset
import os
from PIL import Image
class ImageDataset(Dataset):
def __init__(self, root_dir):
self.file_list = [os.path.join(root_dir, f) for f in os.listdir(root_dir)]
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
img = Image.open(self.file_list[idx])
return img
@distributed デコレータ内でデータをロードする
分散トレーニングにサーバレス GPU API使用する場合は、データ読み込みコードを@distributedデコレータ内に移動します。 データセットのサイズがpickleで許可されている最大サイズを超える可能性があるため、以下に示すように、デコレータ内でデータセットを生成することをお勧めします。
from serverless_gpu import distributed
# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)
@distributed(gpus=8, gpu_type='H100')
def run_train():
# Load data inside the decorator to avoid pickle serialization issues
dataset = get_dataset(file_path)
...
データ読み込みパフォーマンス
/Workspace また、 /Volumesディレクトリはリモートの Unity Catalog ストレージにホストされています。ファイルがUnity Catalogに保存されている場合、データの読み込み速度は利用可能なネットワーク帯域幅によって制限されます。 複数のエポックでトレーニングを行う場合は、まずデータをローカルにコピーし、具体的には高速なNVMe SSDストレージ上にホストされている/tmpディレクトリにコピーすることをお勧めします。
データセットが大きい場合は、以下の手法でパフォーマンスを向上させることができます。
-
マルチエポックトレーニングのためにデータをローカルにキャッシュします。 データセットを
/tmpにコピーして、エポック間でのアクセスを高速化します。Pythonimport shutil
shutil.copytree("/Volumes/catalog/schema/volume/dataset", "/tmp/dataset") -
データ取得を並列化する。 PyTchのDataLoaderを複数のワーカーで使用することで、データ読み込みとGPU計算を並行して実行できます。
num_workers少なくとも2に設定してください。パフォーマンスを向上させるには、num_workers(並列読み取り回数を増やす)またはprefetch_factor(各ワーカーがプリフェッチするアイテム数を増やす)を増やしてください。Pythonfrom torch.utils.data import DataLoader
loader = DataLoader(
dataset,
batch_size=32,
num_workers=2,
prefetch_factor=2,
pin_memory=True
) -
大規模な表形式データセットには、Spark Connectを使用してください。 Spark ConnectはほとんどのPySpark APIsサポートし、分散読み取りを効率的に処理します。
ストリーミングデータセット
メモリに収まらないほど大規模なデータセットの場合は、ストリーミング方式を使用します。
- カスタムストリーミングロジックのためのPyTorch IterableDataset 。
- Hub またはボリュームでホストされているデータセットのストリーミングに対応した、 Hugging Face データセット。
- Ray Dataは、分散バッチデータ処理のためのデータソースです。