Pular para o conteúdo principal

Criar e conectar-se ao Ray clustering em Databricks

Saiba como criar, configurar e executar o Ray compute clustering no Databricks

Requisitos

Para criar um Ray clustering, o senhor deve ter acesso a um recurso Databricks all-purpose compute com as seguintes configurações:

  • Databricks Runtime 12.2 LTS ML e acima.
  • Modos de acesso compartilhado dedicados (anteriormente de usuário único) ou sem isolamento:
nota

Atualmente, o Ray clustering não é compatível com o site serverless compute.

Instale Ray

A partir de Databricks Runtime ML 15.0, o Ray é pré-instalado no clustering Databricks.

Para tempos de execução lançados antes da versão 15.0, use o pip para instalar o Ray em seu cluster:

%pip install ray[default]>=2.3.0

Crie um Ray clustering específico do usuário em um Databricks clustering

Para criar um agrupamento de Ray, use o método ray.util.spark.setup_ray_cluster API.

nota

Quando o senhor cria um Ray clustering em um Notebook, ele só fica disponível para o usuário atual do Notebook. O clustering do Ray é desligado automaticamente depois que o Notebook é desconectado do clustering ou depois de 30 minutos de inatividade (nenhuma tarefa foi enviada ao Ray). Se quiser criar um cluster Ray que seja compartilhado com todos os usuários e não esteja sujeito a um Notebook em execução ativa, use ray.util.spark.setup_global_ray_cluster API em vez disso.

Ray clustering de tamanho fixo

Em qualquer notebook Databricks que esteja anexado a um cluster Databricks, o senhor pode executar o seguinte comando para começar um cluster Ray de tamanho fixo:

Python
import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
max_worker_nodes=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Escalonamento automático Ray clustering

Para saber como iniciar um clustering Ray com dimensionamento automático, consulte escala Ray clustering em Databricks.

Iniciando um Ray clustering no modo global

Usando o Ray 2.9.0 e o acima, o senhor pode criar um cluster Ray de modo global em um cluster Databricks. Um agrupamento de Ray no modo global permite que todos os usuários conectados ao recurso Databricks compute também usem o agrupamento de Ray. Esse modo de execução de um cluster Ray não tem a funcionalidade de tempo limite ativo que um recurso compute dedicado tem ao executar uma instância de cluster Ray de usuário único.

Para começar um clustering global do Ray ao qual vários usuários possam se vincular e executar a tarefa Ray, comece criando um Job do Notebook Databricks e vincule-o a um clustering do modo compartilhado Databricks e, em seguida, execute o seguinte comando:

Python
from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
max_worker_nodes=2,
...
# other arguments are the same as with the `setup_global_ray` API.
)

Essa é uma chamada de bloqueio que permanecerá ativa até que o senhor interrompa a chamada clicando no botão "Interrupt" (Interromper) na célula de comando do Notebook, desconectando o Notebook do agrupamento Databricks ou encerrando o agrupamento Databricks. Caso contrário, o modo global Ray clustering continuará a ser executado e estará disponível para o envio de tarefas por usuários autorizados. Para obter mais informações sobre o modo global de clustering, consulte a documentação do Ray API.

O clustering de modo global tem as seguintes propriedades:

  • Em um clustering Databricks, o senhor só pode criar um clustering Ray de modo global ativo por vez.
  • Em um cluster Databricks, o modo global ativo Ray clustering pode ser usado por todos os usuários em qualquer Notebook Databricks anexado. O senhor pode executar ray.init() para se conectar ao modo global ativo Ray clustering. Como vários usuários podem acessar esse Ray clustering, a contenção de recursos pode ser um problema.
  • Modo global O agrupamento de raios está ativo até que a chamada setup_ray_cluster seja interrompida. Ele não tem um tempo limite de desligamento automático como o clustering Ray de usuário único.

Criar um agrupamento de GPU Ray

Para o clustering de GPU, esses recursos podem ser adicionados ao clustering de Ray da seguinte maneira:

Python
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=8,
num_gpus_per_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Conectar-se ao Ray clustering remoto usando o cliente Ray

No Ray versão 2.3.0 e acima, o senhor pode criar um agrupamento Ray usando o setup_ray_cluster API, e no mesmo Notebook, pode chamar ray.init() API para se conectar a esse Ray clustering. Para obter as cadeias de conexão remota, use o seguinte:

Python
from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Em seguida, o senhor pode conectar o clustering remoto usando as strings de conexão remota acima:

Python
import ray
ray.init(remote_conn_str)

O cliente Ray não é compatível com o Ray dataset API definido no módulo ray.data. Como solução alternativa, o senhor pode envolver o código que chama o Ray dataset API em uma tarefa Ray remota, conforme mostrado no código a seguir:

Python
import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Os valores que precisam ser configurados são o URL Databricks workspace , começando com https://, e os valores encontrados após o /driver-proxy/o/ são encontrados no URL do proxy do Ray Dashboard exibido após o início do agrupamento do Ray.

O Ray Job CLI é usado para enviar um trabalho para um cluster Ray a partir de sistemas externos, mas não é necessário para enviar um trabalho em um cluster Ray em Databricks. Recomenda-se que o Job seja implantado usando Databricks Jobs, que seja criado um Ray clustering por aplicativo e que as ferramentas existentes em Databricks, como Databricks ativo Bundles ou fluxo de trabalho Triggers, sejam usadas para acionar o Job.

Definir um local de saída log

O senhor pode definir o argumento collect_log_to_path para especificar o caminho de destino onde deseja coletar o Ray clustering logs. execução da coleta de registros após o encerramento do Ray clustering.

Databricks recomenda definir um caminho que comece com /dbfs/ ou Unity Catalog Caminho de volume para preservar o logs mesmo que o senhor encerre o agrupamento Apache Spark. Caso contrário, seu logs não poderá ser recuperado, pois o armazenamento local no clustering é excluído quando o clustering é desligado.

Depois de criar um agrupamento Ray, o senhor pode executar qualquer código de aplicativo Ray diretamente no Notebook. Clique em Open Ray clustering Dashboard in a new tab para view o painel do Ray para o clustering.

Habilite os rastreamentos de pilha e o gráfico de chama na página Ray Dashboard Actors

Na página Ray Dashboard Actors, o senhor pode acessar view stack traces e flame gráfico para os atores Ray ativos. Para view essas informações, use o seguinte comando para instalar o py-spy antes de o senhor começar o Ray clustering:

Python
%pip install py-spy

Crie e configure as melhores práticas

Esta seção aborda as práticas recomendadas para criar e configurar o Ray clustering.

Cargas de trabalho sem GPU

A execução do Ray clustering em cima de um Databricks Spark clustering. Um cenário típico é usar um Spark Job e Spark UDF para realizar tarefas simples de pré-processamento de dados que não precisam de recurso de GPU. Em seguida, use o Ray para executar tarefas complicadas de aprendizado de máquina que se beneficiam das GPUs. Nesse caso, o site Databricks recomenda definir o parâmetro de configuração de nível de clustering Apache Spark spark.tarefa.recurso.gpu.amount como 0 para que todas as execuções do site Apache Spark DataFrame transformações e Apache Spark UDF não usem o recurso GPU.

Os benefícios dessa configuração são os seguintes:

  • Isso aumenta o paralelismo do trabalho Apache Spark porque o tipo de instância da GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.
  • Se o clustering do Apache Spark for compartilhado com vários usuários, essa configuração impedirá que o Apache Spark Job concorra por recursos de GPU com cargas de trabalho Ray em execução simultânea.

Desativar transformers trainer MLflow integration se estiver usando-o no Ray tarefa

A integração do transformers trainer MLflow é ativada pelo default de dentro da transformers biblioteca. Se o senhor usar o Ray train para fazer o ajuste fino de um modelo transformers, o Ray tarefa falhará devido a um problema de credencial. No entanto, esse problema não se aplica se o senhor usar diretamente o MLflow para treinamento. Para evitar esse problema, o senhor pode definir a variável de ambiente DISABLE_MLFLOW_INTEGRATION como 'TRUE' na configuração do clustering Databricks ao iniciar o clustering Apache Spark.

Erro de decapagem da função remota Address Ray

Para executar a tarefa Ray, o senhor escolhe a função tarefa. Se você descobrir que a decapagem falhou, você deve diagnosticar qual parte do seu código causa a falha. As causas comuns de erros de decapagem são o tratamento de referências externas, fechamentos e referências a objetos com estado. Um dos erros mais fáceis de verificar e corrigir rapidamente pode ser corrigido movendo as instruções de importação para dentro da declaração da função de tarefa.

Por exemplo, datasets.load_dataset é uma função amplamente usada que é corrigida no lado do driver do Databricks Runtime, tornando a referência impossível de ser corrigida. Para resolver isso, o senhor pode simplesmente escrever a função de tarefa da seguinte forma:

Python
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...

Desativar o monitor de memória do Ray se a tarefa do Ray for encerrada inesperadamente com um erro de falta de memória (OOM)

No Ray 2.9.3, o monitor de memória do Ray tem vários problemas conhecidos que podem fazer com que a tarefa do Ray seja interrompida inadvertidamente sem motivo. Para resolver o problema, o senhor pode desativar o monitor de memória Ray definindo a variável de ambiente RAY_memory_monitor_refresh_ms como 0 na configuração de clustering Databricks ao iniciar o clustering Apache Spark.

Aplicação de funções de transformação a lotes de dados

Ao processar dados em lotes, é recomendável usar a Ray Data API com a função map_batches. Essa abordagem pode ser mais eficiente e dimensionável, especialmente para grandes conjuntos de dados ou cálculos complexos que se beneficiam do processamento em lotes. Qualquer Spark DataFrame pode ser convertido em um conjunto de dados Ray usando o ray.data.from_spark API. A saída processada da chamada dessas transformações API pode ser gravada em tabelas Databricks UC usando o API ray.data.write_databricks_table.

Usando MLflow no Ray tuner, Ray train ou Ray tarefa personalizada

A integração do Databricks MLflow e do Ray requer o Ray 2.41 e o acima.

Para usar o site MLflow com o Ray Tune, o Ray Train ou a tarefa Ray personalizada, defina as seguintes variáveis ambientais: DATABRICKS_HOST e DATABRICKS_TOKEN, ou defina as variáveis ambientais DATABRICKS_HOST, DATABRICKS_CLIENT_ID e DATABRICKS_CLIENT_SECRET antes de chamar ray.util.spark.setup_ray_cluster. O código a seguir mostra como definir essas variáveis.

Python
import os
from ray.util.spark import setup_ray_cluster

os.environ["DATABRICKS_HOST"] = "https://....databricks.com"
os.environ["DATABRICKS_TOKEN"] = "<your PAT token"

setup_ray_cluster(num_cpus_worker_node=2, num_gpus_worker_node=0, max_worker_nodes=1, min_worker_nodes=1)

Use o escopo do Notebook Python biblioteca ou o agrupamento Python biblioteca em Ray tarefa

O uso da Python biblioteca com escopo de Notebook ou o agrupamento da Python biblioteca em uma tarefa remota do Ray requer o Ray 2.12 e acima.

As versões 2.11 e seguintes do Ray têm um problema conhecido em que o Ray tarefa não pode usar o escopo do Notebook Python biblioteca ou o clustering Python biblioteca. Para as versões 2.11 e abaixo do Ray, as dependências adicionais devem ser pré-instaladas na sessão ativa usando o comando mágico %pip antes de iniciar o agrupamento do Ray.

Próximas etapas