Mosaic ストリーミングを使用したデータの読み込み
この記事では、 Mosaic ストリーミング を使用して、 Apache Spark から PyTorchと互換性のある形式にデータを変換する方法について説明します。
Mosaic ストリーミングは、オープンソース データ ロード ライブラリです。 これにより、単一ノードまたは分散トレーニングが可能になり、既に Apache Spark DataFramesとしてロードされているデータセットからのディープラーニングモデルの評価が可能になります。 Mosaic ストリーミングは主に Mosaic Composer をサポートしていますが、ネイティブ PyTorch、 PyTorch Lightning、および TorchDistributorとも統合されています。 Mosaic ストリーミングは、従来の PyTorch DataLoader に比べて、次のような一連の利点を提供します。
- 画像、テキスト、動画、マルチモーダルデータなど、あらゆるデータタイプとの互換性。
- 主要なクラウドストレージプロバイダー(AWS、OCI、GCS、Azure、Databricks UC Volume、およびCloudflare R2、Coreweave、Backblaze b2などのS3互換オブジェクトストア)のサポート
- 正確性を最大限に高めることで、パフォーマンス、柔軟性、使いやすさを保証します。 詳細については、 主な機能 のページをご覧ください。
Mosaic ストリーミングに関する一般的な情報については、 ストリーミング API のドキュメントを参照してください。
Mosaic ストリーミングは、 Databricks Runtime 15.2 ML 以降のすべてのバージョンにプリインストールされています。
Mosaic ストリーミングを使用して Spark DataFrames からデータをロードする
Mosaic ストリーミングは、 Apache Spark から Mosaic Data Shard (MDS) 形式に変換し、分散環境で使用するためにロードできる簡単なワークフローを提供します。
推奨されるワークフローは次のとおりです。
- Apache Spark を使用して、データを読み込み、必要に応じて前処理します。
streaming.base.converters.dataframe_to_mds
を使用して、データフレームを一時的なストレージの場合はディスクに保存し、永続ストレージの場合は Unity Catalog ボリュームに保存します。このデータは MDS 形式で保存され、圧縮とハッシュのサポートによりさらに最適化できます。 高度なユースケースには、UDFを使用したデータの前処理も含まれます。 詳細については、 Spark DataFrame to MDS チュートリアル をご覧ください。streaming.StreamingDataset
を使用して、必要なデータをメモリに読み込みます。StreamingDataset
は、PyTorch の IterableDataset のバージョンで、弾力性のある決定論的シャッフルを特徴としており、高速なミッドエポック再開を可能にします。 詳細については、 StreamingDataset のドキュメント を参照してください。streaming.StreamingDataLoader
を使用して、トレーニング/評価/テストに必要なデータを読み込みます。StreamingDataLoader
は、PyTorchのDataLoaderのバージョンで、このランクでモデルが見たサンプルの数を追跡する追加のチェックポイント/再開インターフェースを提供します。
エンドツーエンドの例については、次のノートブックを参照してください。
Mosaic ストリーミング ノートブックを使用して、 Spark から PyTorch へのデータ読み込みを簡素化
トラブルシューティング: 認証エラー
StreamingDataset
を使用して Unity Catalog ボリュームからデータを読み込むときに次のエラーが表示される場合は、次に示すように環境変数を設定します。
ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.
TorchDistributor
を使用して分散トレーニングを実行しているときにこのエラーが表示される場合は、ワーカーノードで環境変数も設定する必要があります。
db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods
def your_training_function():
import os
os.environ['DATABRICKS_HOST'] = db_host
os.environ['DATABRICKS_TOKEN'] = db_token
# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)