Usar Ray em Databricks
Com o Ray 2.3.0 e superiores, o senhor pode criar clusters Ray e executar aplicativos Ray em clusters Apache Spark com a Databricks. Para obter informações sobre como começar a usar o machine learning no Ray, incluindo tutorial e exemplos, consulte a documentação do Ray. Para obter mais informações sobre a integração do Ray e do Apache Spark, consulte a documentação do Ray em Spark API .
Requisitos
Databricks Runtime 12.2 LTS ML e acima.
O modo de acesso aos clusters do Databricks Runtime deve ser o modo "Assigned" (Atribuído) ou o modo "No isolation shared" (Sem isolamento compartilhado).
Instalar Ray
Use o seguinte comando para instalar o Ray. A extensão [default]
é exigida pelo componente do painel Ray.
%pip install ray[default]>=2.3.0
Criar um cluster Ray específico do usuário em um cluster Databricks
Para criar clusters Ray, use os clustersray.util.spark.setup_ray_ API.
Em qualquer Databricks Notebook que esteja anexado a um cluster Databricks, o senhor pode executar o seguinte comando:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_worker_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
A API ray.util.spark.setup_ray_cluster
cria um cluster Ray no Spark. Internamente, ele cria um plano de fundo Spark Job. Cada tarefa do Spark no Job cria um nó Ray worker e o nó Ray head é criado no driver. O argumento num_worker_nodes
representa o número de nós do Ray worker a serem criados. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó do Ray worker, defina o argumento num_cpus_worker_node
(default value: 1) ou num_gpus_worker_node
(default value: 0).
Após a criação de um cluster Ray, o senhor pode executar qualquer código de aplicativo Ray diretamente no site Notebook. Clique em Open Ray clusters Dashboard in a new tab para view o painel do Ray para os clusters.
Dica
Se estiver usando clusters de usuário único do Databricks, o senhor pode definir num_worker_nodes
como ray.util.spark.MAX_NUM_WORKER_NODES
para usar todos os recursos disponíveis para seus clusters Ray.
setup_ray_cluster(
# ...
num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)
Defina o argumento collect_log_to_path
para especificar o caminho de destino onde deseja coletar os logs do cluster Ray. log execução da coleta após o encerramento do Ray cluster. A Databricks recomenda que o senhor defina um caminho que comece com /dbfs/
para que os logs sejam preservados mesmo que o cluster do Spark seja encerrado. Caso contrário, seus logs não poderão ser recuperados, pois o armazenamento local no cluster é excluído quando o cluster é desligado.
Observação
"Para que seu aplicativo Ray use automaticamente os clusters Ray que foram criados, chame ray.util.spark.setup_ray_cluster
para definir a variável de ambiente RAY_ADDRESS
como o endereço dos clusters Ray." O senhor pode especificar um endereço de clusters alternativo usando o argumento address
da API ray.init.
execução de um aplicativo Ray
Após a criação dos clusters Ray, você poderá executar qualquer código de aplicativo Ray em um Databricks Notebook.
Importante
A Databricks recomenda que você instale todas as bibliotecas necessárias para seu aplicativo com %pip install <your-library-dependency>
para garantir que elas estejam disponíveis para seus clusters e aplicativos Ray adequadamente. Especificar dependências na chamada de função Ray init instala as dependências em um local inacessível aos nós de worker do Spark, o que resulta em incompatibilidades de versão e erros de importação.
Por exemplo, você pode executar um aplicativo Ray simples em um Notebook do Databricks da seguinte maneira:
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
Criar um Ray clusters no modo autoscale
Nas versões 2.8.0 e superiores do Ray, o Ray clusters começará em Databricks e oferecerá suporte à integração com Databricks autoscale. Consulte Databricks cluster autoscale.
Com o Ray 2.8.0 e o acima, o senhor pode criar clusters Ray em clusters Databricks que suportam escalonamento para cima ou para baixo de acordo com as cargas de trabalho. Essa integração de autoescala aciona a autoescala dos clusters do Databricks internamente no ambiente do Databricks.
Para ativar o autoscale, execute o seguinte comando:
from ray.util.spark import setup_ray_cluster
setup_ray_cluster(
num_worker_nodes=8,
autoscale=True,
... # other arguments
)
Se a autoescala estiver ativada, num_worker_nodes
indica o número máximo de nós do Ray worker. O default número mínimo de nós do Ray worker é 0. Essa configuração default significa que quando o Ray cluster é parado, ele escala até zero nós do Ray worker. Isso pode não ser ideal para a capacidade de resposta rápida em todos os cenários, mas, quando ativado, pode reduzir bastante os custos.
No modo de autoescala, num_worker_nodes
não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES
.
Os argumentos a seguir configuram a velocidade de upscaling e downscaling:
autoscale_upscaling_speed
representa o número de nós que podem estar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo será o aumento de escala. Por exemplo, se isso for definido como 1,0, os clusters podem aumentar de tamanho em no máximo 100% a qualquer momento.autoscale_idle_timeout_minutes
representa o número de minutos que precisam passar antes que um nó parado worker seja removido pelo autoscaler. Quanto menor o valor, mais agressivo é o downscaling.
Com o Ray 2.9.0 e o acima, o senhor também pode definir autoscale_min_worker_nodes
para evitar que os clusters do Ray sejam reduzidos a zero quando os clusters do Ray forem parados.
Conectar-se a clusters Ray remotos usando o cliente Ray
No Ray 2.9.3, crie um cluster do Ray chamando a API setup_ray_cluster
. No mesmo Notebook, chame o ray.init()
API para se conectar a esse Ray cluster.
Para um Ray cluster que não esteja no modo global, obtenha as cadeias de conexão remota com o seguinte código:
Para obter as cadeias de conexão remota usando o seguinte:
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
Conecte-se ao site cluster remoto usando essas cadeias de conexão remota:
import ray
ray.init(remote_conn_str)
O cliente Ray não é compatível com a API Ray dataset definida no módulo ray.data
. Como solução alternativa, o senhor pode envolver o código que chama a API do Ray dataset em uma tarefa remota do Ray, conforme mostrado no código a seguir:
import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
Carregar dados de um Spark DataFrame
Para carregar um Spark DataFrame como um Ray dataset, primeiro, o senhor deve salvar o Spark DataFrame em volumes UC ou Databricks Filesystem (obsoleto) como formato Parquet. Para controlar o acesso a Databricks Filesystem com segurança, Databricks recomenda que o senhor monte o armazenamento de objetos cloud em DBFS. Em seguida, o senhor pode criar uma instância ray.data.Dataset
a partir do caminho salvo do Spark DataFrame usando o seguinte método auxiliar:
import ray
import os
from urllib.parse import urlparse
def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
return ray.data.read_parquet(fuse_path)
# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")
# Provide a dbfs location to write the table to
data_location_2 = (
"dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)
# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
spark_dataframe=spark_df,
dbfs_tmp_path=data_location_2
)
Carregar dados de uma tabela do Unity Catalog por meio do warehouse Databricks SQL
No Ray 2.8.0 e acima, o senhor pode chamar a API ray.data.read_databricks_tables
para carregar dados de uma tabela do Databricks Unity Catalog.
Primeiro, o senhor precisa definir a variável de ambiente DATABRICKS_TOKEN
para o seu depósito Databricks access token. Se o senhor não estiver executando o programa em Databricks Runtime, defina a variável de ambiente DATABRICKS_HOST
para o URL Databricks workspace , conforme mostrado a seguir:
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net
Em seguida, chame ray.data.read_databricks_tables()
para ler o armazém Databricks SQL.
import ray
ray_dataset = ray.data.read_databricks_tables(
warehouse_id='...', # Databricks SQL warehouse ID
catalog='catalog_1', # Unity catalog name
schema='db_1', # Schema name
query="SELECT title, score FROM movie WHERE year >= 1980",
)
Configurar o recurso usado pelo nó principal do Ray
Em default, para a configuração do Ray no Spark, o Databricks restringe o recurso alocado para o nó principal do Ray:
0 núcleos de CPU
0 GPUs
128 MB de memória heap
128 MB de memória de armazenamento de objetos
Isso ocorre porque o nó principal do Ray é normalmente usado para a coordenação global, não para a execução da tarefa Ray. O recurso do nó do driver Spark é compartilhado com vários usuários, portanto, a configuração default salva o recurso no lado do driver Spark.
Com o Ray 2.8.0 e o acima, o senhor pode configurar o recurso usado pelo nó principal do Ray. Use os seguintes argumentos na API setup_ray_cluster
:
num_cpus_head_node
Núcleos de CPU usados pelo nó principal do Raynum_gpus_head_node
Configuração da GPU usada pelo nó de cabeça de raioobject_store_memory_head_node
Definição do tamanho da memória do armazenamento de objetos pelo Ray head node
Suporte para clusters heterogêneos
Para uma execução de treinamento mais eficiente e econômica, o senhor pode criar um Ray em clusters Spark e definir configurações diferentes entre o nó principal do Ray e os nós do Ray worker. No entanto, todos os nós do Ray worker devem ter a mesma configuração. Os clusters do Databricks não oferecem suporte total a clusters heterogêneos, mas é possível criar clusters do Databricks com diferentes tipos de driver e de instância do worker definindo uma política de cluster.
Por exemplo:
{
"node_type_id": {
"type": "fixed",
"value": "i3.xlarge"
},
"driver_node_type_id": {
"type": "fixed",
"value": "g4dn.xlarge"
},
"spark_version": {
"type": "fixed",
"value": "13.x-snapshot-gpu-ml-scala2.12"
}
}
Ajuste a configuração clusters do Ray
A configuração recomendada para cada nó worker Ray é:
Mínimo de 4 núcleos de CPU por nó worker Ray.
Mínimo de 10 GB de memória heap para cada nó worker Ray.
Ao chamar ray.util.spark.setup_ray_cluster
, a Databricks recomenda definir num_cpus_worker_node
com um valor >= 4
.
Consulte Alocação de memória para nós worker Ray para obter detalhes sobre como ajustar a memória heap para cada nó worker Ray.
Alocação de memória para nós de trabalho do Ray
Cada nó worker Ray usa dois tipos de memória: memória heap e memória de armazenamento de objeto. O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.
A memória total alocada para cada nó worker do Ray é:
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
é o número máximo de nós worker Ray que podem ser ativados no nó worker Spark. Isso é determinado pelo argumento num_cpus_worker_node
ou num_gpus_worker_node
.
Se você não configurar o argumento object_store_memory_per_node
, o tamanho da memória de heap e o tamanho da memória de armazenamento de objeto alocados para cada nó worker Ray serão:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
Se você definir o argumento object_store_memory_per_node
:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
Além disso, o tamanho da memória de armazenamento de objeto por nó worker Ray é limitado pela memória compartilhada do sistema operacional. O valor máximo é:
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
é o tamanho do disco /dev/shm
configurado para o nó worker Spark.
Práticas recomendadas
Como definir o número de CPU / GPU para cada nó de trabalho do Ray?
Databricks recomenda definir num_cpus_worker_node
como o número de núcleos de CPU por nó Spark worker e definir num_gpus_worker_node
como o número de GPUs por nó Spark worker . Nessa configuração, cada nó Spark worker lança um nó Ray worker que utiliza totalmente o recurso do nó Spark worker .
Configuração do cluster de GPU
O Ray cluster execução em cima de um Databricks Spark cluster. Um cenário comum é usar um Spark Job e um Spark UDF para realizar tarefas simples de pré-processamento de dados que não precisam de recurso de GPU e, em seguida, usar o Ray para executar tarefas complicadas machine learning que se beneficiam das GPUs. Nesse caso, Databricks recomenda definir o parâmetro de configuração de nível Spark cluster spark.task.resource.gpu.amount
para 0
, de modo que todas as transformações Spark DataFrame e execuções Spark UDF não usem o recurso GPU.
Os benefícios dessa configuração são os seguintes:
Ele aumenta o paralelismo do Spark Job, porque o tipo de instância da GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.
Se os clusters do Spark forem compartilhados com vários usuários, essa configuração impedirá que o Spark Job concorra por recursos de GPU com cargas de trabalho Ray em execução simultânea.
Desativar a integração do mlflow do treinador transformers
se estiver usando-o na tarefa Ray
A integração do treinador transformers
MLflow é feita por default. Se o senhor usar o Ray ensinar para ensiná-lo, o Ray tarefa falhará porque a credencial do serviço Databricks MLflow não está configurada para o Ray tarefa.
Para evitar esse problema, defina a variável de ambiente DISABLE_MLFLOW_INTEGRATION
como "TRUE" na configuração do cluster do databricks. Para obter informações sobre como fazer login em MLflow no seu Ray trainer tarefa, consulte a seção "Using MLflow in Ray tarefa" para obter detalhes.
Erro de decapagem da função remota Address Ray
Para executar o Ray tarefa, o Ray usa o pickle para serializar a função tarefa. Se a decapagem falhar, determine a(s) linha(s) em seu código onde ocorre a falha. Muitas vezes, mover import
comando para a função tarefa resolve erros comuns de decapagem. Por exemplo, datasets.load_dataset
é uma função amplamente usada que, por acaso, foi corrigida no Databricks Runtime, o que pode tornar uma importação externa impossível de ser corrigida. Para corrigir esse problema, o senhor pode atualizar seu código da seguinte forma:
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Desative o monitor de memória do Ray se a tarefa do Ray for encerrada inesperadamente com o erro OOM
No Ray 2.9.3, o monitor de memória do Ray tem problemas conhecidos que fazem com que o Ray tarefa seja erroneamente eliminado.
Para resolver o problema, desative o monitor de memória Ray definindo a variável de ambiente RAY_memory_monitor_refresh_ms
como 0
na configuração do cluster do Databricks.
Configuração de recurso de memória para cargas de trabalho híbridas Spark e Ray
Se o senhor executar cargas de trabalho híbridas Spark e Ray em um Databricks cluster, Databricks recomenda reduzir a memória Spark executor para um valor pequeno, como a configuração spark.executor.memory 4g
no Databricks cluster config. Isso se deve ao fato de o executor do Spark ser executado em um processo Java que aciona a coleta de lixo (GC) de forma preguiçosa. A pressão da memória para o cache do Spark dataset é bastante alta, causando uma redução na memória disponível que o Ray pode usar. Para evitar possíveis erros de OOM, a Databricks recomenda que o senhor reduza o 'spark.executor.memory' configurado para um valor menor do que o default.
Configuração de recurso de computação para cargas de trabalho híbridas Spark e Ray
Se o senhor executar cargas de trabalho híbridas Spark e Ray em um Databricks cluster, defina os nós Spark cluster como autoescaláveis, os nós Ray worker como autoescaláveis ou ambos com o autoescalonamento ativado.
Por exemplo, se o senhor tiver um número fixo de worker nós em um Databricks cluster, considere a possibilidade de ativar o Ray-on-Spark autoscale, de modo que, quando não houver nenhuma carga de trabalho do Ray em execução, o Ray cluster diminua. Como resultado, o recurso parado cluster é liberado para que o Spark Job possa usá-lo.
Quando o site Spark Job é concluído e o Ray Job começa, ele aciona o Ray-on-Spark cluster para aumentar a escala e atender às demandas de processamento.
O senhor também pode tornar o cluster do Databricks e o cluster do Ray-on-spark autoescaláveis. Especificamente, o senhor pode configurar os nós Databricks cluster autoescaláveis para um máximo de 10 nós e os nós Ray-on-Spark worker para um máximo de 4 nós (com um nó Ray worker por faísca worker), deixando Spark livre para alocar até 6 nós para Spark tarefa. Isso significa que as cargas de trabalho do Ray podem usar no máximo 4 nós de recurso ao mesmo tempo, enquanto o Spark Job pode alocar no máximo 6 nós de recurso.
Aplicação da função de transformações a lotes de dados
Ao processar dados em lotes, a Databricks recomenda que o senhor use a Ray Data API com a função map_batches
. Essa abordagem pode ser mais eficiente e dimensionável, especialmente para grandes conjuntos de dados ou ao realizar cálculos complexos que se beneficiam do processamento em lotes. Qualquer Spark DataFrame pode ser convertido para Ray uso de dados o ray.data.from_spark
API e pode ser gravado na tabela UC dos bancos de dados usando o API ray.data.write_databricks_table
.
Usando MLflow em Ray tarefa
Para usar o site MLflow no Ray tarefa, configure o seguinte:
Databricks MLflow credenciais em Ray tarefa
MLflow execução no lado do driver Spark que passa os valores
run_id
gerados para o Ray tarefa.
O código a seguir é um exemplo:
import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(experiment_name)
# We need to use the run created in Spark driver side,
# and set `nested=True` to make it a nested run inside the
# parent run.
with mlflow.start_run(run_id=run_id, nested=True):
mlflow.log_metric(f"task_{x}_metric", x)
return x
with mlflow.start_run() as run: # create MLflow run in Spark driver side.
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Usando Notebook com escopo Python biblioteca ou cluster Python biblioteca em Ray tarefa
Atualmente, o Ray tem um problema conhecido: o Ray tarefa não pode usar Notebook-scoped Python biblioteca ou cluster Python biblioteca. Para resolver essa limitação, execute o seguinte comando em seu Notebook antes de lançar um Ray-on-Spark cluster:
%pip install ray==<The Ray version you want to use> --force-reinstall
e, em seguida, execute o seguinte comando em seu Notebook para reiniciar o kernel Python:
dbutils.library.restartPython()
Habilitar rastreamentos de pilha e gráficos de chama na página Ray Dashboard Actors
Na página Ray Dashboard Actors, o senhor pode acessar view stack traces e flame gráfico para os atores Ray ativos.
Para view essa informação, instale py-spy
antes de iniciar o Ray cluster:
%pip install py-spy
Desligar um clustersRay
Para encerrar um cluster Ray em execução no Databricks, chame o comando ray.utils.spark.shutdown_ray_cluster API.
Observação
Os clusters Ray também são encerrados quando:
Você desanexa seu Notebook interativo de seus clusters Databricks.
Seu Job do Databricks é concluído.
Seus clusters Databricks são reiniciados ou encerrados.
Não há atividade durante o tempo de parada especificado.
Notebook de exemplo
O Notebook a seguir demonstra como criar clusters Ray e executar um aplicativo Ray no Databricks.
Limitações
Não há suporte para clusters multiusuário compartilhados do Databricks (modo de isolamento habilitado).
Ao usar %pip para instalar pacotes, os clusters Ray serão desligados. Certifique-se de começar Ray depois de terminar de instalar todas as suas bibliotecas com %pip.
O uso de integrações que substituem a configuração de
ray.util.spark.setup_ray_cluster
pode tornar os clusters do Ray instáveis e travar o contexto do Ray. Por exemplo, usar o pacotexgboost_ray
e definirRayParams
com um ator ou configuraçãocpus_per_actor
em excesso da configuração clusters Ray pode travar silenciosamente os clusters Ray .