Mosaic ストリーミングを使用してデータをロードする
この記事では、Mosaic を使用してApache SparkのデータをPyTorchと互換性のある形式に変換する方法について説明します。
Mosaic ストリーミングは、オープンソースのデータ読み込みライブラリです。 これにより、Apache Spark DataFrames としてすでにロードされているデータセットから、ディープラーニング モデルの単一ノードまたは分散トレーニングと評価が可能になります。 Mosaic ストリーミングは主に Mosaic Composer をサポートしていますが、ネイティブのPyTorch 、 PyTorch Lightning 、 TorchDistributorとも統合されています。 Mosaic ストリーミングは、従来のPyTorch DataLoader に比べて次のような一連の利点を提供します。
画像、テキスト、動画、マルチモーダルデータなど、あらゆるデータタイプとの互換性。
主要なクラウド ストレージ プロバイダー (AWS、OCI、GCS、Azure、Databricks UC Volume、Cloudflare R2、Coreweave、Backblaze b2 などの S3 互換オブジェクト ストアなど) のサポート
正確性、パフォーマンス、柔軟性、使いやすさを最大限に保証します。 詳細については、 主な機能のページをご覧ください。
Mosaic ストリーミングに関する一般的な情報については、 ストリーミング API ドキュメントを参照してください。
注:
Mosaic ストリーミングは、 Databricks Runtime 15.2 ML以降のすべてのバージョンにプリインストールされています。
Mosaic を使用してSpark DataFramesからデータをロードする
Mosaic ストリーミングは、 Apache Sparkを Mosaic Data Shard (MDS) 形式に変換するための簡単なワークフローを提供し、その後、分散環境で使用するためにロードすることができます。
推奨されるワークフローは次のとおりです。
Apache Spark を使用してデータを読み込み、必要に応じて前処理します。
streaming.base.converters.dataframe_to_mds
を使用して、データフレームを一時ストレージ用のディスクに保存したり、永続ストレージ用のUnity Catalogボリュームに保存したりします。 このデータはMDS形式で保存され、圧縮とハッシュのサポートによりさらに最適化できます。 高度なユースケースには、UDFを使用したデータの前処理を含めることもできます。 詳細については、 Spark DataFrame to MDS」を参照してください。streaming.StreamingDataset
を使用して、必要なデータをメモリにロードします。StreamingDataset
、弾力的に決定論的なシャッフルを特徴とする PyTorch の IterableDataset のバージョンであり、エポックの途中での高速再開を可能にします。 詳細については、 StreamingDataset のドキュメントをご覧ください。トレーニング/評価/テストに必要なデータを読み込むには、
streaming.StreamingDataLoader
使用します。StreamingDataLoader
は、追加のチェックポイント/再開インターフェースを提供する PyTorch の DataLoader のバージョンであり、このランクでモデルが確認したサンプルの数を追跡します。
エンドツーエンドの例については、次のノートブックを参照してください。
トラブルシューティング: 認証エラー
StreamingDataset
を使用して Unity Catalog ボリュームからデータを読み込むときに次のエラーが表示される場合は、次に示すように環境変数を設定します。
ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.
注:
TorchDistributor
を使用して分散トレーニングを実行しているときにこのエラーが表示される場合は、ワーカーノードで環境変数も設定する必要があります。
db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods
def your_training_function():
import os
os.environ['DATABRICKS_HOST'] = db_host
os.environ['DATABRICKS_TOKEN'] = db_token
# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)