escala Ray clustering on Databricks
Saiba como ajustar o tamanho do seu clustering Ray para obter o desempenho ideal, incluindo autoescala, configuração do nó principal, clustering heterogêneo e alocação de recursos.
Criar um clustering Ray no modo de autoescala
No Ray 2.8.0 e acima, o Ray clustering começa em Databricks e oferece suporte à integração com o Databricks autoscale. Essa integração de autoescala aciona a autoescala de clustering do Databricks internamente no ambiente Databricks.
Para ativar o autoscale, execute o seguinte comando:
Para a versão do Ray abaixo de 2.10:
from ray.util.spark import setup_ray_cluster
setup_ray_cluster(
num_worker_nodes=8,
autoscale=True,
)
Para Ray versão 2.10 e versões posteriores:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
O ray.util.spark.setup_ray_cluster
API cria um agrupamento de Ray em Apache Spark. Internamente, ele cria um trabalho em segundo plano no site Apache Spark. Cada tarefa Apache Spark no Job cria um nó Ray worker e o nó Ray head é criado no driver. Os argumentos min_worker_nodes
e max_worker_nodes
representam o intervalo de nós do Ray worker a serem criados e utilizados para as cargas de trabalho do Ray. Se o argumento min_worker_nodes
for deixado indefinido, será iniciado um agrupamento Ray de tamanho fixo com max_worker_nodes
número de trabalhadores disponíveis. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó do Ray worker, defina o argumento num_cpus_worker_node
(default value: 1) ou num_gpus_worker_node
(default valor: 0).
Para o Ray versão abaixo de 2.10, se a autoescala estiver ativada, num_worker_nodes
indica o número máximo de nós do Ray worker. O default número mínimo de nós do Ray worker é zero. Essa configuração default significa que, quando o agrupamento de raios é parado, ele escala até zero nós do Ray worker. Isso pode não ser ideal para uma resposta rápida em todos os cenários, mas pode reduzir significativamente os custos quando ativado.
No modo de autoescala, o trabalhador não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES
.
Os argumentos a seguir configuram a velocidade de aumento e redução de escala:
autoscale_upscaling_speed
representa o número de nós que podem ficar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo é o aumento de escala. Por exemplo, se isso for definido como 1,0, o clustering poderá aumentar de tamanho em no máximo 100% a qualquer momento.autoscale_idle_timeout_minutes
representa o número de minutos que precisam passar antes que o autoscaler remova um nó parado worker. Quanto menor o valor, mais agressiva é a redução de escala.
Com o Ray 2.9.0 e o acima, o senhor também pode definir autoscale_min_worker_nodes
para evitar que o clustering do Ray reduza a escala para zero worker quando o clustering do Ray for parado, o que faria com que o clustering fosse encerrado.
Configurar o recurso usado pelo nó principal do Ray
Em default, para a configuração Ray on Spark, Databricks restringe o recurso alocado para o nó principal Ray a:
- 0 núcleos de CPU
- 0 GPUs
- Memória de pilha de 128 MB
- 128 MB de memória de armazenamento de objetos
Isso ocorre porque o nó principal do Ray é normalmente usado apenas para coordenação global, não para executar a tarefa Ray. O recurso do nó do driver Apache Spark é compartilhado com vários usuários, portanto, a configuração default salva o recurso no lado do driver Apache Spark. Com o Ray 2.8.0 e o acima, o senhor pode configurar o recurso usado pelo nó principal do Ray. Use os seguintes argumentos na API setup_ray_cluster:
num_cpus_head_node
: configurando núcleos de CPU usados pelo Ray Head Nodenum_gpus_head_node
: configurando a GPU usada pelo Ray Head Nodeobject_store_memory_head_node
: definindo o tamanho da memória de armazenamento de objetos pelo nó principal do Ray
Suporte para clustering heterogêneo
O senhor pode criar um clustering do Ray no site Spark para obter uma execução de treinamento mais eficiente e econômica e definir configurações diferentes entre o nó principal do Ray e os nós do Ray worker. No entanto, todos os nós do Ray worker devem ter a mesma configuração. Databricks não oferecem suporte total ao clustering heterogêneo, mas é possível criar um clustering Databricks com diferentes tipos de driver e de instância worker definindo uma política de cluster. Por exemplo:
{
"node_type_id": {
"type": "fixed",
"value": "i3.xlarge"
},
"driver_node_type_id": {
"type": "fixed",
"value": "g4dn.xlarge"
},
"spark_version": {
"type": "fixed",
"value": "13.x-snapshot-gpu-ml-scala2.12"
}
}
Ajustar a configuração do Ray clustering
A configuração recomendada para cada Ray worker node é a seguinte: Mínimo de 4 núcleos de CPU por Ray worker node. Memória heap mínima de 10 GB para cada nó do Ray worker.
Portanto, ao chamar ray.util.spark.setup_ray_cluster
, a Databricks recomenda definir num_cpus_per_node
como um valor maior ou igual a 4.
Consulte a próxima seção para obter detalhes sobre o ajuste da memória heap para cada nó do Ray worker.
Alocação de memória para Ray worker nodes
Cada nó do Ray worker usa dois tipos de memória: memória heap e memória de armazenamento de objetos.
O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.
A memória total alocada para cada nó do Ray worker é:
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
é o número máximo de nós Ray worker que podem ser iniciados no nó Apache Spark worker . Isso é determinado pelo argumento num_cpus_per_node
ou num_gpus_per_node
.
Se o senhor não definir o argumento object_store_memory_per_node
, o tamanho da memória heap e o tamanho da memória do armazenamento de objetos alocados para cada nó do Ray worker serão:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
Se você definir o argumento object_store_memory_per_node
:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
Além disso, o tamanho da memória do armazenamento de objetos por nó do Ray worker é limitado pela memória compartilhada do sistema operacional. O valor máximo é:
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
é o tamanho do disco /dev/shm
configurado para o nó Apache Spark worker .
Prática recomendada de escalabilidade
Defina o número de CPU e GPU para cada nó do Ray worker
É recomendável definir o argumento num_cpus_worker_node
como o número de núcleos de CPU por nó Apache Spark worker . Da mesma forma, definir num_gpus_worker_node
como o número de GPUs por nó Apache Spark worker é o ideal. Com essa configuração, cada nó Apache Spark worker lança um nó Ray worker que utilizará totalmente o recurso de cada nó Apache Spark worker .
Defina a variável de ambiente RAY_memory_monitor_refresh_ms
para 0
na configuração de clustering do Databricks ao iniciar o clustering do Apache Spark.
Configuração de recurso de memória para cargas de trabalho híbridas Apache Spark e Ray
Se o senhor executar cargas de trabalho híbridas Spark e Ray em um cluster Databricks, Databricks recomenda reduzir a memória Spark executor para um valor pequeno. Por exemplo, defina spark.executor.memory 4g
na configuração de clustering do site Databricks.
O Apache Spark executor é um processo Java que aciona o GC de forma preguiçosa, e o cache Apache Spark dataset usa muita memória Apache Spark executor . Isso reduz a memória disponível que o Ray pode usar. Para evitar possíveis erros de falta de memória, reduza a configuração spark.executor.memory
.
Configuração de recurso de computação para cargas de trabalho híbridas Apache Spark e Ray
Se o senhor executar cargas de trabalho híbridas do Spark e do Ray em um cluster Databricks, recomendamos que torne os nós de cluster ou os nós do Ray worker autoescaláveis. Por exemplo:
Se o senhor tiver um número fixo de nós worker disponíveis para começar um clustering Databricks, recomendamos que ative o Ray-on-Spark autoscale. Quando nenhuma carga de trabalho do Ray estiver em execução, o Ray clustering será desativado, permitindo que o recurso seja liberado para ser usado pela tarefa Apache Spark. Quando a tarefa Apache Spark for concluída e o Ray for usado novamente, o clustering Ray-on-Spark será novamente escalonado para atender à demanda.
Além disso, o senhor pode tornar o Databricks e o clustering Ray-on-spark autoescaláveis. Por exemplo, se o senhor configurar os nós autoescaláveis do clustering Databricks para um máximo de 10 nós, configurar os nós Ray-on-Spark worker para um máximo de quatro nós e configurar cada nó Ray worker para utilizar totalmente o recurso de cada Apache Spark worker, as cargas de trabalho Ray poderão usar no máximo quatro nós recurso nessa configuração de clustering. Em comparação, o Apache Spark Job pode alocar no máximo seis nós de recurso.