AI Runtimeにデータをロードする
パブリックプレビュー
単一ノードタスク用のAI Runtimeはパブリック プレビュー段階にあります。 マルチ GPU ワークロード用の分散トレーニングAPIベータ版のままです。
このセクションでは、特にMLおよび深層学習(DL)アプリケーション向けに、 AI Runtimeへのデータロードに関する情報について説明します。 Spark Python API を使用したデータの読み込みと変換方法の詳細については、チュートリアルを参照してください。
Unity Catalogが必要です。AI Runtime上のすべてのデータアクセスはUnity Catalogを経由します。 テーブルとボリュームはUnity Catalogに登録され、ユーザーまたはサービスプリンシパルがアクセスできる必要があります。
表形式のデータを読み込む
Spark Connectを使用して、 Deltaテーブルから表形式の機械学習データをロードします。
シングルノードの場合PySparkメソッドtoPandas()を使用してApache Spark DataFrames Pandas DataFramesに変換し、必要に応じてPySparkメソッドto_numpy()を使用してNumPy形式に変換できます。
Spark Connectは、解析と名前解決を実行時に行うため、コードの動作が変わる可能性があります。Spark ConnectとSpark Classicの比較を参照してください。
Spark Connect は、 Spark SQL 、 Spark上のPandas API 、構造化ストリーミング、 MLlib (DataFrame ベース) など、ほとんどのPySpark APIsサポートします。 最新のサポート対象APIsについては、 PySpark APIリファレンスドキュメントを参照してください。
その他の制限については、 「 サーバレス コンピュートの制限 」を参照してください。
ボリュームを使用して大きな Delta テーブルをロードします
toPandas()で変換するには大きすぎる大規模なDeltaテーブルの場合は、データをUnity Catalogボリュームにエクスポートし、 PyTorchまたはHugging Faceを使用して直接ロードします。
# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset
dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")
このアプローチは、トレーニング中のSparkのオーバーヘッドを回避し、シングルGPUと分散型トレーニングの両方のワークフローでうまく機能します。
ボリュームから非構造化データをロードする UCVolumeDataset
Unity Catalog ボリュームに保存されている画像、音声、テキストファイルなどの非構造化データの場合、serverless_gpu.data パッケージの UCVolumeDataset を使用します。UCVolumeDataset は、ボリュームから各ファイルを最初のアクセス時に高速なローカルキャッシュにコピーし、キャッシュされたローカルファイルのパスを生成する PyTorch の IterableDataset です。手作業で実装する必要があるパフォーマンスと分散に関する課題を処理します:
- ローカルキャッシュ ファイルは、最初のアクセス時にFUSEマウントからローカルキャッシュディレクトリにコピーされ、その後、キャッシュから提供されるため、マルチエポックトレーニングでボリュームを再度読み込むことはありません。
- 自動パーティショニング
torch.distributedが初期化されると、ファイルはランク全体でパーティション分割され、さらにDataLoaderワーカーに分割されるため、各(rank, worker)ペアは、追加のセットアップなしで重複しないスライスを受け取ります。
UCVolumeDataset serverless_gpu.data.DataLoader と GPU環境5 以上が必要です。
UCVolumeDataset 未加工のローカルファイルパスを返します。それらのファイルをテンソルにデコードするには、パスのストリームを取り込み、解析ロジックを適用する2番目のIterableDatasetでラップします。これはI/Oと構文解析に関する事項を分離します。
from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF
class ImageDataset(IterableDataset):
"""Decodes each cached file path from UCVolumeDataset into a tensor."""
def __init__(self, path_dataset: UCVolumeDataset):
self._path_dataset = path_dataset
def __iter__(self):
for local_path in self._path_dataset:
image = Image.open(local_path).convert("RGB")
yield TF.to_tensor(image)
path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)
ラッパーはすでにキャッシュされているローカルパスを受け取るので、解析ステップがFUSEマウントに触れることはありません。拡張、トークン化、またはフィルタリングのために、追加のラッパーを連結できます。
最適なパフォーマンスを得るには、標準のPyTorch DataLoaderではなく、UCVolumeDatasetをserverless_gpu.data.DataLoaderと組み合わせます。サーバレス GPU I/O 向けに調整されており、GPU がコンピュートを実行する間、ファイルを並行してフェッチおよびキャッシュします。「データ読み込みパフォーマンス」を参照してください。
@distributed デコレータ内でデータをロードする
分散トレーニングにサーバレス GPU API使用する場合は、データ読み込みコードを@distributedデコレータ内に移動します。 データセットのサイズがpickleで許可されている最大サイズを超える可能性があるため、以下に示すように、デコレータ内でデータセットを生成することをお勧めします。
from serverless_gpu import distributed
# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)
@distributed(gpus=8, gpu_type='H100')
def run_train():
# Load data inside the decorator to avoid pickle serialization issues
dataset = get_dataset(file_path)
...
デコレータ内でUCVolumeDatasetを構築すると、反復時にtorch.distributedランク情報を読み込み、ランク間でファイルを自動的にパーティション分割します。そのため、ファイルベースのボリュームデータにDistributedSamplerは必要ありません。
データ読み込みパフォーマンス
/Workspace および、/Volumes ディレクトリはリモートの Unity Catalog ストレージでホストされています。データセットがUnity Catalogに保存されている場合、利用可能なネットワーク帯域幅によってデータ読み込み速度が制限されます。複数のエポックをトレーニングしている場合、推奨されるアプローチは UCVolumeDataset を使用することです。これは、最初のアクセス時に各ファイルをローカルストレージにコピーし、それ以降の読み取りをローカルコピーから提供することで、このキャッシングを行います。ボリューム内のデータセットでは、トレーニングでその一部しか使用しない場合でもツリー全体を事前にコピーする手動のshutil.copytreeよりも、ボリューム内のデータセットが優先されます。
データセットが大規模な場合、以下の手法でスループットを改善できます:
-
serverless_gpu.data.DataLoaderを使用して取得を並列化する。 これは、サーバレス GPU I/O 用に調整された torchDataLoaderのドロップイン サブクラスです。num_workersはデフォルトで 6 に設定され、prefetch_factorはデフォルトで 4 に設定されます (PyTorch の 0 および 2 と比較して)。そのため、GPU がコンピュートを行っている間、ファイルが同時にフェッチおよびキャッシュされます。また、バッチごとのフェッチタイミングをアクティブなMLflow実行にログ記録するため、データ読み込みのボトルネックを特定するのに役立ちます。Pythonfrom serverless_gpu.data import DataLoader
loader = DataLoader(
dataset,
batch_size=32,
pin_memory=True,
# num_workers=6, by default
# prefetch_factor=4, by default
# raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue.
)すべてのランクは同じ
num_workers値を使用する必要があります。なぜなら、UCVolumeDatasetがworld_size × num_workersスロットにわたってグローバルストライドを使用してファイルをパーティション分割するためです。値の不一致により、ファイルが重複したりスキップされたりします。 -
バッチサイズを増やす より大きなバッチは、より多くのサンプルにわたってバッチごとのデータ読み込みオーバーヘッドを償却し、ステップごとのファイル取得操作の数を削減します。GPU メモリが律速段階である場合、勾配蓄積とより大きいバッチサイズを組み合わせることで、実質的なバッチサイズを維持できます。
ストリーミングデータセット
メモリに収まらないほど大規模なデータセットの場合は、ストリーミング方式を使用します。
UCVolumeDatasetserverless_gpu.dataを使用して、ローカルキャッシュと自動分散パーティション分割機能を備えた Unity Catalog ボリュームからファイルをストリーミングします。「UCVolumeDatasetを使用してボリュームから非構造化データをロードする」を参照してください。- カスタムストリーミングロジックのためのPyTorch IterableDataset 。
- Hub またはボリュームでホストされているデータセットのストリーミングに対応した、 Hugging Face データセット。
- Ray Dataは、分散バッチデータ処理のためのデータソースです。