メインコンテンツまでスキップ

AI Runtimeにデータをロードする

備考

パブリックプレビュー

単一ノードタスク用のAI Runtimeはパブリック プレビュー段階にあります。 マルチ GPU ワークロード用の分散トレーニングAPIベータ版のままです。

このセクションでは、特にMLおよび深層学習(DL)アプリケーション向けに、 AI Runtimeへのデータロードに関する情報について説明します。 Spark Python API を使用したデータの読み込みと変換方法の詳細については、チュートリアルを参照してください。

注記

Unity Catalogが必要です。AI Runtime上のすべてのデータアクセスはUnity Catalogを経由します。 テーブルとボリュームはUnity Catalogに登録され、ユーザーまたはサービスプリンシパルがアクセスできる必要があります。

表形式のデータを読み込む

Spark Connectを使用して、 Deltaテーブルから表形式の機械学習データをロードします。

シングルノードの場合PySparkメソッドtoPandas()を使用してApache Spark DataFrames Pandas DataFramesに変換し、必要に応じてPySparkメソッドto_numpy()を使用してNumPy形式に変換できます。

注記

Spark Connectは、解析と名前解決を実行時に行うため、コードの動作が変わる可能性があります。Spark ConnectとSpark Classicの比較を参照してください。

Spark Connect は、 Spark SQL 、 Spark上のPandas API 、構造化ストリーミング、 MLlib (DataFrame ベース) など、ほとんどのPySpark APIsサポートします。 最新のサポート対象APIsについては、 PySpark APIリファレンスドキュメントを参照してください。

その他の制限については、 「 サーバレス コンピュートの制限 」を参照してください。

ボリュームを使用して大きな Delta テーブルをロードします

toPandas()で変換するには大きすぎる大規模なDeltaテーブルの場合は、データをUnity Catalogボリュームにエクスポートし、 PyTorchまたはHugging Faceを使用して直接ロードします。

Python
# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
Python
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

このアプローチは、トレーニング中のSparkのオーバーヘッドを回避し、シングルGPUと分散型トレーニングの両方のワークフローでうまく機能します。

ボリュームから非構造化データをロードする UCVolumeDataset

Unity Catalog ボリュームに保存されている画像、音声、テキストファイルなどの非構造化データの場合、serverless_gpu.data パッケージの UCVolumeDataset を使用します。UCVolumeDataset は、ボリュームから各ファイルを最初のアクセス時に高速なローカルキャッシュにコピーし、キャッシュされたローカルファイルのパスを生成する PyTorch の IterableDataset です。手作業で実装する必要があるパフォーマンスと分散に関する課題を処理します:

  • ローカルキャッシュ ファイルは、最初のアクセス時にFUSEマウントからローカルキャッシュディレクトリにコピーされ、その後、キャッシュから提供されるため、マルチエポックトレーニングでボリュームを再度読み込むことはありません。
  • 自動パーティショニング torch.distributed が初期化されると、ファイルはランク全体でパーティション分割され、さらに DataLoader ワーカーに分割されるため、各 (rank, worker) ペアは、追加のセットアップなしで重複しないスライスを受け取ります。
注記

UCVolumeDataset serverless_gpu.data.DataLoaderGPU環境5 以上が必要です。

UCVolumeDataset 未加工のローカルファイルパスを返します。それらのファイルをテンソルにデコードするには、パスのストリームを取り込み、解析ロジックを適用する2番目のIterableDatasetでラップします。これはI/Oと構文解析に関する事項を分離します。

Python
from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF

class ImageDataset(IterableDataset):
"""Decodes each cached file path from UCVolumeDataset into a tensor."""

def __init__(self, path_dataset: UCVolumeDataset):
self._path_dataset = path_dataset

def __iter__(self):
for local_path in self._path_dataset:
image = Image.open(local_path).convert("RGB")
yield TF.to_tensor(image)

path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)

ラッパーはすでにキャッシュされているローカルパスを受け取るので、解析ステップがFUSEマウントに触れることはありません。拡張、トークン化、またはフィルタリングのために、追加のラッパーを連結できます。

最適なパフォーマンスを得るには、標準のPyTorch DataLoaderではなく、UCVolumeDatasetserverless_gpu.data.DataLoaderと組み合わせます。サーバレス GPU I/O 向けに調整されており、GPU がコンピュートを実行する間、ファイルを並行してフェッチおよびキャッシュします。「データ読み込みパフォーマンス」を参照してください。

@distributed デコレータ内でデータをロードする

分散トレーニングにサーバレス GPU API使用する場合は、データ読み込みコードを@distributedデコレータ内に移動します。 データセットのサイズがpickleで許可されている最大サイズを超える可能性があるため、以下に示すように、デコレータ内でデータセットを生成することをお勧めします。

Python
from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
# Load data inside the decorator to avoid pickle serialization issues
dataset = get_dataset(file_path)
...

デコレータ内でUCVolumeDatasetを構築すると、反復時にtorch.distributedランク情報を読み込み、ランク間でファイルを自動的にパーティション分割します。そのため、ファイルベースのボリュームデータにDistributedSamplerは必要ありません。

データ読み込みパフォーマンス

/Workspace および、/Volumes ディレクトリはリモートの Unity Catalog ストレージでホストされています。データセットがUnity Catalogに保存されている場合、利用可能なネットワーク帯域幅によってデータ読み込み速度が制限されます。複数のエポックをトレーニングしている場合、推奨されるアプローチは UCVolumeDataset を使用することです。これは、最初のアクセス時に各ファイルをローカルストレージにコピーし、それ以降の読み取りをローカルコピーから提供することで、このキャッシングを行います。ボリューム内のデータセットでは、トレーニングでその一部しか使用しない場合でもツリー全体を事前にコピーする手動のshutil.copytreeよりも、ボリューム内のデータセットが優先されます。

データセットが大規模な場合、以下の手法でスループットを改善できます:

  • serverless_gpu.data.DataLoader を使用して取得を並列化する。 これは、サーバレス GPU I/O 用に調整された torch DataLoader のドロップイン サブクラスです。num_workers はデフォルトで 6 に設定され、prefetch_factor はデフォルトで 4 に設定されます (PyTorch の 0 および 2 と比較して)。そのため、GPU がコンピュートを行っている間、ファイルが同時にフェッチおよびキャッシュされます。また、バッチごとのフェッチタイミングをアクティブなMLflow実行にログ記録するため、データ読み込みのボトルネックを特定するのに役立ちます。

    Python
    from serverless_gpu.data import DataLoader

    loader = DataLoader(
    dataset,
    batch_size=32,
    pin_memory=True,
    # num_workers=6, by default
    # prefetch_factor=4, by default
    # raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue.
    )

    すべてのランクは同じnum_workers値を使用する必要があります。なぜなら、UCVolumeDatasetworld_size × num_workersスロットにわたってグローバルストライドを使用してファイルをパーティション分割するためです。値の不一致により、ファイルが重複したりスキップされたりします。

  • バッチサイズを増やす より大きなバッチは、より多くのサンプルにわたってバッチごとのデータ読み込みオーバーヘッドを償却し、ステップごとのファイル取得操作の数を削減します。GPU メモリが律速段階である場合、勾配蓄積とより大きいバッチサイズを組み合わせることで、実質的なバッチサイズを維持できます。

ストリーミングデータセット

メモリに収まらないほど大規模なデータセットの場合は、ストリーミング方式を使用します。