Carregar uso de dados Transmissão em mosaico

Este artigo descreve como usar a transmissão do Mosaic para converter dados do site Apache Spark em um formato compatível com o site PyTorch.

A transmissão do Mosaic é uma biblioteca de carregamento de dados de código aberto. Ele permite o treinamento distribuído ou de nó único e a avaliação de modelos de aprendizagem profunda a partir de conjuntos de dados que já estão carregados como Apache Spark DataFrames. A transmissão do Mosaic é compatível principalmente com o Mosaic Composer, mas também se integra com os sites nativos PyTorch, PyTorch Lightning e TorchDistributor. A transmissão Mosaic oferece uma série de benefícios em relação aos tradicionais PyTorch DataLoaders, incluindo:

  • Compatibilidade com qualquer tipo de dados, incluindo imagens, texto, vídeo e dados multimodais.

  • Suporte para os principais provedores de armazenamento cloud (AWS, OCI, GCS, Azure, Databricks UC Volume e qualquer armazenamento de objetos compatível com S3, como Cloudflare R2, Coreweave, Backblaze b2, etc.)

  • Maximizar as garantias de correção, o desempenho, a flexibilidade e a facilidade de uso. Para obter mais informações, view a página de recursokey .

Para obter informações gerais sobre a transmissão do Mosaic, acesse view a documentação da transmissão API .

Observação

A transmissão do Mosaic foi pré-instalada em todas as versões do Databricks Runtime 15.2 ML e superiores.

Carregar dados do site Spark DataFrames usando a transmissão Mosaic

A transmissão do Mosaic oferece um fluxo de trabalho simples para a conversão do site Apache Spark para o formato Mosaic Data Shard (MDS), que pode então ser carregado para uso em um ambiente distribuído.

O fluxo de trabalho recomendado é:

  1. Use o Apache Spark para carregar e, opcionalmente, pré-processar dados.

  2. Use streaming.base.converters.dataframe_to_mds para salvar o dataframe no disco para armazenamento temporário e/ou em um volume do Unity Catalog para armazenamento persistente. Esses dados serão armazenados no formato MDS e podem ser otimizados ainda mais com suporte para compactação e hashing. Os casos de uso avançado também podem incluir o pré-processamento de UDFs de uso de dados. view O senhor pode obter mais informações nos sites Spark DataFrame e MDS tutorial.

  3. Use streaming.StreamingDataset para carregar os dados necessários na memória. StreamingDataset é uma versão do IterableDataset do PyTorchque recorre ao embaralhamento elasticamente determinístico, o que permite a retomada rápida no meio da época. view a documentação do StreamingDataset para obter mais informações.

  4. Use streaming.StreamingDataLoader para carregar os dados necessários para treinamento/avaliação/teste. StreamingDataLoader é uma versão do DataLoader do PyTorch que fornece uma interface adicional de ponto de verificação/resumo, para a qual ele rastreia o número de amostras vistas pelo modelo nessa classificação.

Para obter um exemplo de ponta a ponta, consulte o seguinte Notebook:

Simplifique o carregamento de dados de Spark para PyTorch usando a transmissão Mosaic Notebook

Abra o bloco de anotações em outra guia

Solução de problemas: erro de autenticação

Se o senhor observar o seguinte erro ao carregar dados de um volume Unity Catalog usando StreamingDataset, configure a variável de ambiente conforme mostrado abaixo.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Observação

Se esse erro ocorrer ao executar o treinamento distribuído usando TorchDistributor, o senhor também deverá definir a variável de ambiente nos nós do site worker.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)