Usar Ray em Databricks

Com o Ray 2.3.0 e superiores, o senhor pode criar clusters Ray e executar aplicativos Ray em clusters Apache Spark com a Databricks. Para obter informações sobre como começar a usar o machine learning no Ray, incluindo tutorial e exemplos, consulte a documentação do Ray. Para obter mais informações sobre a integração do Ray e do Apache Spark, consulte a documentação do Ray em Spark API .

Requisitos

  • Databricks Runtime 12.2 LTS ML e acima.

  • O modo de acesso aos clusters do Databricks Runtime deve ser o modo "Assigned" (Atribuído) ou o modo "No isolation shared" (Sem isolamento compartilhado).

Instalar Ray

Use o seguinte comando para instalar o Ray. A extensão [default] é exigida pelo componente do painel Ray.

%pip install ray[default]>=2.3.0

Criar um cluster Ray específico do usuário em um cluster Databricks

Para criar clusters Ray, use os clustersray.util.spark.setup_ray_ API.

Em qualquer Databricks Notebook que esteja anexado a um cluster Databricks, o senhor pode executar o seguinte comando:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

A API ray.util.spark.setup_ray_cluster cria um cluster Ray no Spark. Internamente, ele cria um plano de fundo Spark Job. Cada tarefa do Spark no Job cria um nó Ray worker e o nó Ray head é criado no driver. O argumento num_worker_nodes representa o número de nós do Ray worker a serem criados. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó do Ray worker, defina o argumento num_cpus_worker_node (default value: 1) ou num_gpus_worker_node (default value: 0).

Após a criação de um cluster Ray, o senhor pode executar qualquer código de aplicativo Ray diretamente no site Notebook. Clique em Open Ray clusters Dashboard in a new tab para view o painel do Ray para os clusters.

Dica

Se estiver usando clusters de usuário único do Databricks, o senhor pode definir num_worker_nodes como ray.util.spark.MAX_NUM_WORKER_NODES para usar todos os recursos disponíveis para seus clusters Ray.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Defina o argumento collect_log_to_path para especificar o caminho de destino onde deseja coletar os logs do cluster Ray. log execução da coleta após o encerramento do Ray cluster. A Databricks recomenda que o senhor defina um caminho que comece com /dbfs/ para que os logs sejam preservados mesmo que o cluster do Spark seja encerrado. Caso contrário, seus logs não poderão ser recuperados, pois o armazenamento local no cluster é excluído quando o cluster é desligado.

Observação

"Para que seu aplicativo Ray use automaticamente os clusters Ray que foram criados, chame ray.util.spark.setup_ray_cluster para definir a variável de ambiente RAY_ADDRESS como o endereço dos clusters Ray." O senhor pode especificar um endereço de clusters alternativo usando o argumento address da API ray.init.

execução de um aplicativo Ray

Após a criação dos clusters Ray, você poderá executar qualquer código de aplicativo Ray em um Databricks Notebook.

Importante

A Databricks recomenda que você instale todas as bibliotecas necessárias para seu aplicativo com %pip install <your-library-dependency> para garantir que elas estejam disponíveis para seus clusters e aplicativos Ray adequadamente. Especificar dependências na chamada de função Ray init instala as dependências em um local inacessível aos nós de worker do Spark, o que resulta em incompatibilidades de versão e erros de importação.

Por exemplo, você pode executar um aplicativo Ray simples em um Notebook do Databricks da seguinte maneira:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Criar um Ray clusters no modo autoscale

Nas versões 2.8.0 e superiores do Ray, o Ray clusters começará em Databricks e oferecerá suporte à integração com Databricks autoscale. Consulte Databricks cluster autoscale.

Com o Ray 2.8.0 e o acima, o senhor pode criar clusters Ray em clusters Databricks que suportam escalonamento para cima ou para baixo de acordo com as cargas de trabalho. Essa integração de autoescala aciona a autoescala dos clusters do Databricks internamente no ambiente do Databricks.

Para ativar o autoscale, execute o seguinte comando:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

Se a autoescala estiver ativada, num_worker_nodes indica o número máximo de nós do Ray worker. O default número mínimo de nós do Ray worker é 0. Essa configuração default significa que quando o Ray cluster é parado, ele escala até zero nós do Ray worker. Isso pode não ser ideal para a capacidade de resposta rápida em todos os cenários, mas, quando ativado, pode reduzir bastante os custos.

No modo de autoescala, num_worker_nodes não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES.

Os argumentos a seguir configuram a velocidade de upscaling e downscaling:

  • autoscale_upscaling_speed representa o número de nós que podem estar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo será o aumento de escala. Por exemplo, se isso for definido como 1,0, os clusters podem aumentar de tamanho em no máximo 100% a qualquer momento.

  • autoscale_idle_timeout_minutes representa o número de minutos que precisam passar antes que um nó parado worker seja removido pelo autoscaler. Quanto menor o valor, mais agressivo é o downscaling.

Com o Ray 2.9.0 e o acima, o senhor também pode definir autoscale_min_worker_nodes para evitar que os clusters do Ray sejam reduzidos a zero quando os clusters do Ray forem parados.

Conectar-se a clusters Ray remotos usando o cliente Ray

No Ray 2.9.3, crie um cluster do Ray chamando a API setup_ray_cluster. No mesmo Notebook, chame o ray.init() API para se conectar a esse Ray cluster.

Para um Ray cluster que não esteja no modo global, obtenha as cadeias de conexão remota com o seguinte código:

Para obter as cadeias de conexão remota usando o seguinte:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Conecte-se ao site cluster remoto usando essas cadeias de conexão remota:

import ray
ray.init(remote_conn_str)

O cliente Ray não é compatível com a API Ray dataset definida no módulo ray.data. Como solução alternativa, o senhor pode envolver o código que chama a API do Ray dataset em uma tarefa remota do Ray, conforme mostrado no código a seguir:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Carregar dados de um Spark DataFrame

Para carregar um Spark DataFrame como um Ray dataset, primeiro, o senhor deve salvar o Spark DataFrame em volumes UC ou Databricks Filesystem (obsoleto) como formato Parquet. Para controlar o acesso a Databricks Filesystem com segurança, Databricks recomenda que o senhor monte o armazenamento de objetos cloud em DBFS. Em seguida, o senhor pode criar uma instância ray.data.Dataset a partir do caminho salvo do Spark DataFrame usando o seguinte método auxiliar:

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Carregar dados de uma tabela do Unity Catalog por meio do warehouse Databricks SQL

No Ray 2.8.0 e acima, o senhor pode chamar a API ray.data.read_databricks_tables para carregar dados de uma tabela do Databricks Unity Catalog.

Primeiro, o senhor precisa definir a variável de ambiente DATABRICKS_TOKEN para o seu depósito Databricks access token. Se o senhor não estiver executando o programa em Databricks Runtime, defina a variável de ambiente DATABRICKS_HOST para o URL Databricks workspace , conforme mostrado a seguir:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Em seguida, chame ray.data.read_databricks_tables() para ler o armazém Databricks SQL.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Configurar o recurso usado pelo nó principal do Ray

Em default, para a configuração do Ray no Spark, o Databricks restringe o recurso alocado para o nó principal do Ray:

  • 0 núcleos de CPU

  • 0 GPUs

  • 128 MB de memória heap

  • 128 MB de memória de armazenamento de objetos

Isso ocorre porque o nó principal do Ray é normalmente usado para a coordenação global, não para a execução da tarefa Ray. O recurso do nó do driver Spark é compartilhado com vários usuários, portanto, a configuração default salva o recurso no lado do driver Spark.

Com o Ray 2.8.0 e o acima, o senhor pode configurar o recurso usado pelo nó principal do Ray. Use os seguintes argumentos na API setup_ray_cluster:

  • num_cpus_head_nodeNúcleos de CPU usados pelo nó principal do Ray

  • num_gpus_head_nodeConfiguração da GPU usada pelo nó de cabeça de raio

  • object_store_memory_head_nodeDefinição do tamanho da memória do armazenamento de objetos pelo Ray head node

Suporte para clusters heterogêneos

Para uma execução de treinamento mais eficiente e econômica, o senhor pode criar um Ray em clusters Spark e definir configurações diferentes entre o nó principal do Ray e os nós do Ray worker. No entanto, todos os nós do Ray worker devem ter a mesma configuração. Os clusters do Databricks não oferecem suporte total a clusters heterogêneos, mas é possível criar clusters do Databricks com diferentes tipos de driver e de instância do worker definindo uma política de cluster.

Por exemplo:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Ajuste a configuração clusters do Ray

A configuração recomendada para cada nó worker Ray é:

  • Mínimo de 4 núcleos de CPU por nó worker Ray.

  • Mínimo de 10 GB de memória heap para cada nó worker Ray.

Ao chamar ray.util.spark.setup_ray_cluster, a Databricks recomenda definir num_cpus_worker_node com um valor >= 4.

Consulte Alocação de memória para nós worker Ray para obter detalhes sobre como ajustar a memória heap para cada nó worker Ray.

Alocação de memória para nós de trabalho do Ray

Cada nó worker Ray usa dois tipos de memória: memória heap e memória de armazenamento de objeto. O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.

A memória total alocada para cada nó worker do Ray é:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES é o número máximo de nós worker Ray que podem ser ativados no nó worker Spark. Isso é determinado pelo argumento num_cpus_worker_node ou num_gpus_worker_node.

Se você não configurar o argumento object_store_memory_per_node, o tamanho da memória de heap e o tamanho da memória de armazenamento de objeto alocados para cada nó worker Ray serão:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Se você definir o argumento object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Além disso, o tamanho da memória de armazenamento de objeto por nó worker Ray é limitado pela memória compartilhada do sistema operacional. O valor máximo é:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY é o tamanho do disco /dev/shm configurado para o nó worker Spark.

Práticas recomendadas

Como definir o número de CPU / GPU para cada nó de trabalho do Ray?

Databricks recomenda definir num_cpus_worker_node como o número de núcleos de CPU por nó Spark worker e definir num_gpus_worker_node como o número de GPUs por nó Spark worker . Nessa configuração, cada nó Spark worker lança um nó Ray worker que utiliza totalmente o recurso do nó Spark worker .

Configuração do cluster de GPU

O Ray cluster execução em cima de um Databricks Spark cluster. Um cenário comum é usar um Spark Job e um Spark UDF para realizar tarefas simples de pré-processamento de dados que não precisam de recurso de GPU e, em seguida, usar o Ray para executar tarefas complicadas machine learning que se beneficiam das GPUs. Nesse caso, Databricks recomenda definir o parâmetro de configuração de nível Spark cluster spark.task.resource.gpu.amount para 0, de modo que todas as transformações Spark DataFrame e execuções Spark UDF não usem o recurso GPU.

Os benefícios dessa configuração são os seguintes:

  • Ele aumenta o paralelismo do Spark Job, porque o tipo de instância da GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.

  • Se os clusters do Spark forem compartilhados com vários usuários, essa configuração impedirá que o Spark Job concorra por recursos de GPU com cargas de trabalho Ray em execução simultânea.

Desativar a integração do mlflow do treinador transformers se estiver usando-o na tarefa Ray

A integração do treinador transformers MLflow é feita por default. Se o senhor usar o Ray ensinar para ensiná-lo, o Ray tarefa falhará porque a credencial do serviço Databricks MLflow não está configurada para o Ray tarefa.

Para evitar esse problema, defina a variável de ambiente DISABLE_MLFLOW_INTEGRATION como "TRUE" na configuração do cluster do databricks. Para obter informações sobre como fazer login em MLflow no seu Ray trainer tarefa, consulte a seção "Using MLflow in Ray tarefa" para obter detalhes.

Erro de decapagem da função remota Address Ray

Para executar o Ray tarefa, o Ray usa o pickle para serializar a função tarefa. Se a decapagem falhar, determine a(s) linha(s) em seu código onde ocorre a falha. Muitas vezes, mover import comando para a função tarefa resolve erros comuns de decapagem. Por exemplo, datasets.load_dataset é uma função amplamente usada que, por acaso, foi corrigida no Databricks Runtime, o que pode tornar uma importação externa impossível de ser corrigida. Para corrigir esse problema, o senhor pode atualizar seu código da seguinte forma:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Desative o monitor de memória do Ray se a tarefa do Ray for encerrada inesperadamente com o erro OOM

No Ray 2.9.3, o monitor de memória do Ray tem problemas conhecidos que fazem com que o Ray tarefa seja erroneamente eliminado.

Para resolver o problema, desative o monitor de memória Ray definindo a variável de ambiente RAY_memory_monitor_refresh_ms como 0 na configuração do cluster do Databricks.

Configuração de recurso de memória para cargas de trabalho híbridas Spark e Ray

Se o senhor executar cargas de trabalho híbridas Spark e Ray em um Databricks cluster, Databricks recomenda reduzir a memória Spark executor para um valor pequeno, como a configuração spark.executor.memory 4g no Databricks cluster config. Isso se deve ao fato de o executor do Spark ser executado em um processo Java que aciona a coleta de lixo (GC) de forma preguiçosa. A pressão da memória para o cache do Spark dataset é bastante alta, causando uma redução na memória disponível que o Ray pode usar. Para evitar possíveis erros de OOM, a Databricks recomenda que o senhor reduza o 'spark.executor.memory' configurado para um valor menor do que o default.

Configuração de recurso de computação para cargas de trabalho híbridas Spark e Ray

Se o senhor executar cargas de trabalho híbridas Spark e Ray em um Databricks cluster, defina os nós Spark cluster como autoescaláveis, os nós Ray worker como autoescaláveis ou ambos com o autoescalonamento ativado.

Por exemplo, se o senhor tiver um número fixo de worker nós em um Databricks cluster, considere a possibilidade de ativar o Ray-on-Spark autoscale, de modo que, quando não houver nenhuma carga de trabalho do Ray em execução, o Ray cluster diminua. Como resultado, o recurso parado cluster é liberado para que o Spark Job possa usá-lo.

Quando o site Spark Job é concluído e o Ray Job começa, ele aciona o Ray-on-Spark cluster para aumentar a escala e atender às demandas de processamento.

O senhor também pode tornar o cluster do Databricks e o cluster do Ray-on-spark autoescaláveis. Especificamente, o senhor pode configurar os nós Databricks cluster autoescaláveis para um máximo de 10 nós e os nós Ray-on-Spark worker para um máximo de 4 nós (com um nó Ray worker por faísca worker), deixando Spark livre para alocar até 6 nós para Spark tarefa. Isso significa que as cargas de trabalho do Ray podem usar no máximo 4 nós de recurso ao mesmo tempo, enquanto o Spark Job pode alocar no máximo 6 nós de recurso.

Aplicação da função de transformações a lotes de dados

Ao processar dados em lotes, a Databricks recomenda que o senhor use a Ray Data API com a função map_batches. Essa abordagem pode ser mais eficiente e dimensionável, especialmente para grandes conjuntos de dados ou ao realizar cálculos complexos que se beneficiam do processamento em lotes. Qualquer Spark DataFrame pode ser convertido para Ray uso de dados o ray.data.from_spark API e pode ser gravado na tabela UC dos bancos de dados usando o API ray.data.write_databricks_table.

Usando MLflow em Ray tarefa

Para usar o site MLflow no Ray tarefa, configure o seguinte:

  • Databricks MLflow credenciais em Ray tarefa

  • MLflow execução no lado do driver Spark que passa os valores run_id gerados para o Ray tarefa.

O código a seguir é um exemplo:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Usando Notebook com escopo Python biblioteca ou cluster Python biblioteca em Ray tarefa

Atualmente, o Ray tem um problema conhecido: o Ray tarefa não pode usar Notebook-scoped Python biblioteca ou cluster Python biblioteca. Para resolver essa limitação, execute o seguinte comando em seu Notebook antes de lançar um Ray-on-Spark cluster:

%pip install ray==<The Ray version you want to use> --force-reinstall

e, em seguida, execute o seguinte comando em seu Notebook para reiniciar o kernel Python:

dbutils.library.restartPython()

Habilitar rastreamentos de pilha e gráficos de chama na página Ray Dashboard Actors

Na página Ray Dashboard Actors, o senhor pode acessar view stack traces e flame gráfico para os atores Ray ativos.

Para view essa informação, instale py-spy antes de iniciar o Ray cluster:

%pip install py-spy

Desligar um clustersRay

Para encerrar um cluster Ray em execução no Databricks, chame o comando ray.utils.spark.shutdown_ray_cluster API.

Observação

Os clusters Ray também são encerrados quando:

  • Você desanexa seu Notebook interativo de seus clusters Databricks.

  • Seu Job do Databricks é concluído.

  • Seus clusters Databricks são reiniciados ou encerrados.

  • Não há atividade durante o tempo de parada especificado.

Notebook de exemplo

O Notebook a seguir demonstra como criar clusters Ray e executar um aplicativo Ray no Databricks.

Notebook de iniciação Ray on Spark

Abra o bloco de anotações em outra guia

Limitações

  • Não há suporte para clusters multiusuário compartilhados do Databricks (modo de isolamento habilitado).

  • Ao usar %pip para instalar pacotes, os clusters Ray serão desligados. Certifique-se de começar Ray depois de terminar de instalar todas as suas bibliotecas com %pip.

  • O uso de integrações que substituem a configuração de ray.util.spark.setup_ray_cluster pode tornar os clusters do Ray instáveis e travar o contexto do Ray. Por exemplo, usar o pacote xgboost_ray e definir RayParams com um ator ou configuração cpus_per_actor em excesso da configuração clusters Ray pode travar silenciosamente os clusters Ray .