Criar e conectar-se a clusters Ray no Databricks

Saiba como criar, configurar e executar o Ray compute clusters no Databricks

Requisitos

Para criar um Ray cluster, o senhor deve ter acesso a um recurso Databricks de uso geral compute com as seguintes configurações:

  • Databricks Runtime 12.2 LTS ML e acima.

  • O modo de acesso deve ser Usuário único ou Sem isolamento compartilhado.

Observação

Ray clusters não é suportado atualmente em serverless compute.

Instalar o Ray

A partir do Databricks Runtime ML 15.0, o Ray é pré-instalado nos clusters do Databricks.

Para tempos de execução lançados antes da versão 15.0, use o pip para instalar o Ray em seu cluster:

%pip install ray[default]>=2.3.0

Criar um cluster Ray específico do usuário em um cluster Databricks

Para criar um cluster Ray, use o método ray.util.spark.setup_ray_cluster API.

Observação

Quando o senhor cria um Ray cluster em um Notebook, ele só fica disponível para o usuário atual do Notebook. O Ray cluster é desligado automaticamente depois que o Notebook é desconectado do cluster ou após 30 minutos de inatividade (nenhuma tarefa foi enviada ao Ray). Se quiser criar um Ray cluster que seja compartilhado com todos os usuários e não esteja sujeito a um Notebook em execução ativa, use o ray.util.spark.setup_global_ray_cluster API em vez disso.

Ray cluster de tamanho fixo

Em qualquer Databricks Notebook que esteja anexado a um Databricks cluster, o senhor pode executar o seguinte comando para começar um Ray cluster de tamanho fixo:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Escalonamento automático do Ray cluster

Para saber como começar um Ray cluster de escala automática, consulte escala Ray clusters em Databricks.

Iniciando um Ray cluster no modo global

Usando o Ray 2.9.0 e o acima, o senhor pode criar um Ray de modo global cluster em um Databricks cluster. Um cluster Ray de modo global permite que todos os usuários conectados ao cluster Databricks também usem o cluster Ray. Esse modo de execução de um cluster Ray não tem a funcionalidade de tempo limite ativo que um cluster de usuário único tem ao executar uma instância de cluster Ray de usuário único.

Para começar um ray global cluster ao qual vários usuários possam se vincular e executar o Ray tarefa, comece criando um Databricks Notebook Job e vincule-o a um modo compartilhado Databricks cluster e, em seguida, execute o seguinte comando:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Essa é uma chamada de bloqueio que permanecerá ativa até que o senhor interrompa a chamada clicando no botão "Interrupt" (Interromper) na célula de comando Notebook, desconectando o Notebook do Databricks cluster ou encerrando o Databricks cluster. Caso contrário, o modo global Ray cluster continuará a ser executado e estará disponível para envio de tarefas por usuários autorizados. Para obter mais informações sobre o modo global clusters, consulte a documentação do Ray API .

Os clusters do modo global têm as seguintes propriedades:

  • Em um cluster Databricks, o senhor só pode criar um cluster Ray de modo global ativo por vez.

  • Em um Databricks cluster, o modo global ativo Ray cluster pode ser usado por todos os usuários em qualquer Databricks Notebook anexado. O senhor pode executar ray.init() para se conectar ao modo global ativo Ray cluster. Como vários usuários podem acessar esse Ray cluster, a contenção de recursos pode ser um problema.

  • Modo global O cluster de raios está ativo até que a chamada setup_ray_cluster seja interrompida. Ele não tem um tempo limite de desligamento automático como os clusters Ray de usuário único.

Criar um cluster de GPU Ray

Para a GPU clusters, esses recursos podem ser adicionados ao Ray cluster da seguinte forma:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Conectar-se ao Ray cluster remoto usando o cliente Ray

No Ray versão 2.3.0 e acima, o senhor pode criar um Ray cluster usando o setup_ray_cluster API, e no mesmo Notebook, o senhor pode chamar ray.init() API para se conectar a esse Ray cluster. Para obter as cadeias de conexão remota, use o seguinte:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Em seguida, o senhor pode conectar o site remoto cluster usando as cadeias de conexão remota acima:

import ray
ray.init(remote_conn_str)

O cliente Ray não é compatível com o Ray dataset API definido no módulo ray.data. Como solução alternativa, o senhor pode envolver o código que chama o Ray dataset API em uma tarefa Ray remota, conforme mostrado no código a seguir:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Os valores que precisam ser configurados são o URL Databricks workspace , começando com https://, e os valores encontrados após o /driver-proxy/o/ são encontrados no URL do proxy do Ray Dashboard exibido após o Ray cluster começar.

O Ray Job CLI é usado para enviar um trabalho para um Ray cluster de sistemas externos, mas não é necessário para enviar um trabalho no Ray clusters em Databricks. Recomenda-se que o trabalho seja implantado usando Databricks Jobs, que seja criado um Ray cluster por aplicativo e que as ferramentas existentes em Databricks, como Databricks ativo Bundles ou fluxo de trabalho Triggers, sejam usadas para acionar o trabalho.

Definir um local de saída de registro

O senhor pode definir o argumento collect_log_to_path para especificar o caminho de destino onde deseja coletar os logs do cluster Ray. log execução da coleta após o encerramento do Ray cluster.

Databricks Recomenda-se definir um caminho que comece com /dbfs/ ou Unity Catalog Caminho de volume para preservar o logs mesmo que o senhor encerre o Apache Spark cluster. Caso contrário, seus logs não poderão ser recuperados, pois o armazenamento local no cluster é excluído quando o cluster é desligado.

Depois de criar um Ray cluster, o senhor pode executar qualquer código de aplicativo Ray diretamente no seu Notebook. Clique em Open Ray cluster Dashboard in a new tab para view o painel do Ray para o cluster.

Habilite os rastreamentos de pilha e o gráfico de chama na página Ray Dashboard Actors

Na página Ray Dashboard Actors, o senhor pode acessar view stack traces e flame gráfico para os atores Ray ativos. Para view essa informação, use o seguinte comando para instalar o py-spy antes de o senhor começar o Ray cluster:

%pip install py-spy

Criar e configurar práticas recomendadas

Esta seção aborda as práticas recomendadas para criar e configurar clusters Ray.

Cargas de trabalho sem GPU

O Ray cluster execução em cima de um Databricks Spark cluster. Um cenário típico é usar um Spark Job e um Spark UDF para realizar tarefas simples de pré-processamento de dados que não precisam de recurso de GPU. Em seguida, use o Ray para executar tarefas machine learning complicadas que se beneficiam das GPUs. Nesse caso, o site Databricks recomenda definir o parâmetro de configuração de nível Apache Spark cluster spark.tarefa.recurso.gpu.amount como 0 para que todas as transformações e execuções Apache Spark DataFrame Apache Spark UDF não usem o recurso GPU.

Os benefícios dessa configuração são os seguintes:

  • Isso aumenta o paralelismo do Apache Spark Job porque o tipo de instância da GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.

  • Se o Apache Spark cluster for compartilhado com vários usuários, essa configuração impedirá que o Apache Spark Job concorra por recursos de GPU com cargas de trabalho Ray em execução simultânea.

Desabilitar transformers o treinador MLflow integração se estiver usando-o na tarefa Ray

A integração do transformers trainer MLflow é ativada pelo default de dentro da transformers biblioteca. Se o senhor usar o Ray ensinar para fazer o ajuste fino de um modelo transformers, o Ray tarefa falhará devido a um problema de credencial. No entanto, esse problema não se aplica se o senhor usar diretamente o MLflow para treinamento. Para evitar esse problema, o senhor pode definir a variável de ambiente DISABLE_MLFLOW_INTEGRATION como 'TRUE' na configuração do cluster do Databricks ao iniciar o cluster do Apache Spark.

Erro de decapagem da função remota Address Ray

Para executar a tarefa Ray, o senhor escolhe a função tarefa. Se constatar que a decapagem falhou, o senhor deve diagnosticar qual parte do seu código está causando a falha. As causas comuns de erros de decapagem são a manipulação de referências externas, fechamentos e referências a objetos com estado. Um dos erros mais fáceis de verificar e corrigir rapidamente pode ser corrigido movendo as instruções de importação para dentro da declaração da função de tarefa.

Por exemplo, datasets.load_dataset é uma função amplamente usada que é corrigida no lado do driver do Databricks Runtime, tornando a referência impalpável. Para resolver isso, o senhor pode simplesmente escrever a função de tarefa da seguinte forma:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Desativar o monitor de memória do Ray se a tarefa do Ray for encerrada inesperadamente com um erro de falta de memória (OOM)

No Ray 2.9.3, o monitor de memória do Ray tem vários problemas conhecidos que podem fazer com que a tarefa do Ray seja interrompida inadvertidamente sem motivo. Para resolver o problema, o senhor pode desativar o monitor de memória Ray definindo a variável de ambiente RAY_memory_monitor_refresh_ms como 0 na configuração do cluster Databricks ao iniciar o cluster do Apache Spark.

Ler dados Spark do Ray

Um caso de uso comum é ler dados de um DataFrame do Spark no Ray para processamento posterior. Em Databricks Runtime 15.0 para ML e acima, uma função está disponível para simplificar o carregamento dos dados contidos em um Spark DataFrame diretamente no Ray.

Para usar esse recurso de forma eficaz, certifique-se de que a configuração spark.databricks.pyspark.dataFrameChunk.enabled do Spark cluster esteja definida como true antes de executar ray.init() para começar seu Ray cluster.

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

Ray buscará o conteúdo do Spark DataFrame diretamente, sem a necessidade de uma gravação temporária dos dados, tornando-os diretamente disponíveis para processamento.

Ler dados de raio do Spark

Da mesma forma que a leitura de dados do Spark a partir do Ray, a capacidade de ler os resultados de uma tarefa do Ray de volta como um Spark DataFrame é compatível com o uso do Unity Catalog. Para usar esse recurso, o senhor deve estar executando o Databricks Runtime 15.0 para ML e o acima de dentro de um Unity Catalog habilitado workspace.

Para usar esse recurso, certifique-se de que a variável de ambiente "_RAY_UC_VOLUMES_FUST_TEMP_DIR" esteja definida como um caminho válido e acessível do Unity Catalog Volume, como "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

# Write to the specified (via environment variable) UC Volume from Ray
ray_dataset.write_databricks_table()

Nas versões do Databricks Runtime anteriores à 15.0 para ML, o senhor pode gravar diretamente em um local de armazenamento de objetos usando o Ray Parquet writer, ray_dataset.write_parquet() do módulo ray.data. Spark pode ler esses dados em Parquet com leitores nativos.

Aplicação de funções de transformação a lotes de dados

Ao processar dados em lotes, é recomendável usar a Ray Data API com a função map_batches. Essa abordagem pode ser mais eficiente e dimensionável, especialmente para grandes conjuntos de dados ou cálculos complexos que se beneficiam do processamento em lotes. Qualquer Spark DataFrame pode ser convertido em um Ray dataset usando o ray.data.from_spark API. A saída processada da chamada dessas transformações API pode ser gravada em tabelas Databricks UC usando o API ray.data.write_databricks_table.

Usando MLflow em Ray tarefa

Para usar o MLflow no Ray tarefa, o senhor precisará :

  • Defina Databricks MLflow credenciais no Ray tarefa.

  • Criar MLflow execução no driver Apache Spark e passar o run_id criado para a tarefa Ray.

O exemplo de código a seguir demonstra como fazer isso:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Use Notebook-scoped Python biblioteca ou cluster Python biblioteca em Ray tarefa

Atualmente, Ray tem um problema conhecido em que Ray tarefa não pode usar Notebook scoped Python biblioteca ou cluster Python biblioteca. Para utilizar dependências adicionais em seu Ray Job, o senhor deve instalar manualmente o biblioteca usando o comando mágico %pip antes de iniciar um Ray-on-Spark cluster que usará essas dependências na tarefa. Por exemplo, para atualizar a versão do Ray que será usada para começar o Ray cluster, o senhor pode executar o seguinte comando em seu Notebook:

%pip install ray==<The Ray version you want to use> --force-reinstall

Em seguida, execute o seguinte comando em seu Notebook para reiniciar o kernel Python:

dbutils.library.restartPython()