Mosaic ストリーミングを使用してデータをロードする

この記事では、Mosaic を使用してApache SparkのデータをPyTorchと互換性のある形式に変換する方法について説明します。

Mosaic ストリーミングは、オープンソースのデータ読み込みライブラリです。 これにより、Apache Spark DataFrames としてすでにロードされているデータセットから、ディープラーニング モデルの単一ノードまたは分散トレーニングと評価が可能になります。 Mosaic ストリーミングは主に Mosaic Composer をサポートしていますが、ネイティブのPyTorch 、 PyTorch Lightning 、 TorchDistributorとも統合されています。 Mosaic ストリーミングは、従来のPyTorch DataLoader に比べて次のような一連の利点を提供します。

  • 画像、テキスト、動画、マルチモーダルデータなど、あらゆるデータタイプとの互換性。

  • 主要なクラウド ストレージ プロバイダー (AWS、OCI、GCS、Azure、Databricks UC Volume、Cloudflare R2、Coreweave、Backblaze b2 などの S3 互換オブジェクト ストアなど) のサポート

  • 正確性、パフォーマンス、柔軟性、使いやすさを最大限に保証します。 詳細については、 主な機能のページをご覧ください。

Mosaic ストリーミングに関する一般的な情報については、 ストリーミング API ドキュメントを参照してください。

注:

Mosaic ストリーミングは、 Databricks Runtime 15.2 ML以降のすべてのバージョンにプリインストールされています。

Mosaic を使用してSpark DataFramesからデータをロードする

Mosaic ストリーミングは、 Apache Sparkを Mosaic Data Shard (MDS) 形式に変換するための簡単なワークフローを提供し、その後、分散環境で使用するためにロードすることができます。

推奨されるワークフローは次のとおりです。

  1. Apache Spark を使用してデータを読み込み、必要に応じて前処理します。

  2. streaming.base.converters.dataframe_to_mds を使用して、データフレームを一時ストレージ用のディスクに保存したり、永続ストレージ用のUnity Catalogボリュームに保存したりします。 このデータはMDS形式で保存され、圧縮とハッシュのサポートによりさらに最適化できます。 高度なユースケースには、UDFを使用したデータの前処理を含めることもできます。 詳細については、 Spark DataFrame to MDS」を参照してください。

  3. streaming.StreamingDataset を使用して、必要なデータをメモリにロードします。StreamingDataset 、弾力的に決定論的なシャッフルを特徴とする PyTorch の IterableDataset のバージョンであり、エポックの途中での高速再開を可能にします。 詳細については、 StreamingDataset のドキュメントをご覧ください。

  4. トレーニング/評価/テストに必要なデータを読み込むには、 streaming.StreamingDataLoader使用します。 StreamingDataLoaderは、追加のチェックポイント/再開インターフェースを提供する PyTorch の DataLoader のバージョンであり、このランクでモデルが確認したサンプルの数を追跡します。

エンドツーエンドの例については、次のノートブックを参照してください。

Mosaic を使用してSparkからPyTorchへのデータ読み込みを簡素化する

ノートブックを新しいタブで開く

トラブルシューティング: 認証エラー

StreamingDatasetを使用して Unity Catalog ボリュームからデータを読み込むときに次のエラーが表示される場合は、次に示すように環境変数を設定します。

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

注:

TorchDistributorを使用して分散トレーニングを実行しているときにこのエラーが表示される場合は、ワーカーノードで環境変数も設定する必要があります。

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)