Usar Ray em Databricks

Visualização

Este recurso está em visualização pública.

Com o Ray 2.3.0 e superiores, o senhor pode criar clusters Ray e executar aplicativos Ray em clusters Apache Spark com a Databricks. Para obter informações sobre como começar a usar o machine learning no Ray, incluindo tutorial e exemplos, consulte a documentação do Ray. Para obter mais informações sobre a integração do Ray e do Apache Spark, consulte a documentação do Ray em Spark API .

Requisitos

  • Databricks Runtime 12,0 MLe acima.

  • O modo de acesso aos clusters do Databricks Runtime deve ser o modo "Assigned" (Atribuído) ou o modo "No isolation shared" (Sem isolamento compartilhado).

Instalar Ray

Use o seguinte comando para instalar o Ray. A extensão [default] é exigida pelo componente do painel Ray.

%pip install ray[default]>=2.3.0

Criar um cluster Ray específico do usuário em um cluster Databricks

Para criar clusters Ray, use os clustersray.util.spark.setup_ray_ API.

Em qualquer Databricks Notebook que esteja anexado a um cluster Databricks, o senhor pode executar o seguinte comando:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

A API ray.util.spark.setup_ray_cluster cria clusters Ray no Spark. Internamente, ele cria um Spark Job em segundo plano. Cada tarefa do Spark no Job cria um nó worker Ray e o nó do cabeçote Ray é criado no driver. O argumento num_worker_nodes representa o número de nós worker Ray a serem criados. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó worker Ray, defina o argumento num_cpus_per_node ou num_gpus_per_node.

Após a criação de um cluster Ray, o senhor pode executar qualquer código de aplicativo Ray diretamente no site Notebook. Clique em Open Ray clusters Dashboard in a new tab para view o painel do Ray para os clusters.

Dica

Se estiver usando clusters de usuário único do Databricks, o senhor pode definir num_worker_nodes como ray.util.spark.MAX_NUM_WORKER_NODES para usar todos os recursos disponíveis para seus clusters Ray.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

É possível definir o argumento collect_log_to_path para especificar o caminho de destino onde deseja coletar os logs clusters Ray. Execução da coleta de logs após o encerramento dos clusters Ray. A Databricks recomenda que você defina um caminho começando com /dbfs/ para que os logs sejam preservados mesmo se você encerrar os clusters Spark. Caso contrário, seus logs não serão recuperáveis, pois o armazenamento local nos clusters será excluído quando os clusters forem encerrados.

Observação

"Para que seu aplicativo Ray use automaticamente os clusters Ray que foram criados, chame ray.util.spark.setup_ray_cluster para definir a variável de ambiente RAY_ADDRESS como o endereço dos clusters Ray." O senhor pode especificar um endereço de clusters alternativo usando o argumento address da API ray.init.

execução de um aplicativo Ray

Após a criação dos clusters Ray, você poderá executar qualquer código de aplicativo Ray em um Databricks Notebook.

Importante

A Databricks recomenda que você instale todas as bibliotecas necessárias para seu aplicativo com %pip install <your-library-dependency> para garantir que elas estejam disponíveis para seus clusters e aplicativos Ray adequadamente. Especificar dependências na chamada de função Ray init instala as dependências em um local inacessível aos nós de worker do Spark, o que resulta em incompatibilidades de versão e erros de importação.

Por exemplo, você pode executar um aplicativo Ray simples em um Notebook do Databricks da seguinte maneira:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Criar um Ray clusters no modo autoscale

No Ray 2.8.0 e superiores, os clusters do Ray que começam no Databricks suportam a integração com o Databricks autoscale. Consulte Escala automática do cluster do Databricks.

Com o Ray 2.8.0 e o acima, o senhor pode criar clusters Ray em clusters Databricks que suportam escalonamento para cima ou para baixo de acordo com as cargas de trabalho. Essa integração de autoescala aciona a autoescala dos clusters do Databricks internamente no ambiente do Databricks.

Para ativar o autoscale, execute o seguinte comando:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

Se a autoescala estiver ativada, num_worker_nodes indica o número máximo de nós do Ray worker. O default número mínimo de nós do Ray worker é zero. Essa configuração de default significa que, quando os clusters de raios são parados, ele escala até zero nós do Ray worker. Isso pode não ser ideal para a capacidade de resposta rápida em todos os cenários, mas, quando ativado, pode reduzir bastante os custos.

No modo de autoescala, num_worker_nodes não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES.

Os argumentos a seguir configuram a velocidade de upscaling e downscaling:

  • autoscale_upscaling_speed representa o número de nós que podem estar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo será o aumento de escala. Por exemplo, se isso for definido como 1,0, os clusters podem aumentar de tamanho em no máximo 100% a qualquer momento.

  • autoscale_idle_timeout_minutes representa o número de minutos que precisam passar antes que um nó parado worker seja removido pelo autoscaler. Quanto menor o valor, mais agressivo é o downscaling.

Com o Ray 2.9.0 e o acima, o senhor também pode definir autoscale_min_worker_nodes para evitar que os clusters do Ray sejam reduzidos a zero quando os clusters do Ray forem parados.

Conectar-se a clusters Ray remotos usando o cliente Ray

No Ray 2.9.0, o senhor pode criar um Ray clusters usando a API setup_ray_cluster e, no mesmo Notebook, pode chamar a API ray.init() API para se conectar a esse Ray clusters.

Para obter as cadeias de conexão remota usando o seguinte:

from ray.util.spark import setup_ray_cluster

remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Em seguida, o senhor pode conectar os clusters remotos usando as cadeias de conexão remota acima:

import ray
ray.init(remote_conn_str)

O cliente Ray não é compatível com a API Ray dataset definida no módulo ray.data. Como solução alternativa, o senhor pode envolver o código que chama a API do Ray dataset em uma tarefa remota do Ray, conforme mostrado no código a seguir:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Carregar dados de um Spark DataFrame

Para carregar um Spark DataFrame como um Ray dataset, primeiro o senhor deve salvar o Spark DataFrame em volumes UC ou no Databricks Filesystem (obsoleto) usando o formato Parquet. Para controlar o acesso ao Databricks Filesystem de forma segura, a Databricks recomenda que o senhor monte o armazenamento de objetos em nuvem no DBFS. Em seguida, o senhor pode criar uma instância ray.data.Dataset a partir do caminho salvo do Spark DataFrame usando o seguinte método auxiliar:

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Carregar dados de uma tabela do Unity Catalog por meio do warehouse Databricks SQL

No Ray 2.8.0 e acima, o senhor pode chamar a API ray.data.read_databricks_tables para carregar dados de uma tabela do Databricks Unity Catalog.

Primeiro, o senhor precisa definir a variável de ambiente DATABRICKS_TOKEN para o seu depósito da Databricks access token. Se não estiver executando seu programa no Databricks Runtime, defina também a variável de ambiente DATABRICKS_HOST como o URL do Databricks workspace, conforme mostrado a seguir:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Em seguida, chame ray.data.read_databricks_tables() para ler o armazém Databricks SQL.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Configurar o recurso usado pelo nó principal do Ray

Em default, para a configuração do Ray no Spark, o Databricks restringe o recurso alocado para o nó principal do Ray:

  • 0 núcleos de CPU

  • 0 GPUs

  • 128 MB de memória heap

  • 128 MB de memória de armazenamento de objetos

Isso ocorre porque o nó principal do Ray geralmente é usado apenas para a coordenação global, não para a execução da tarefa Ray. O recurso do nó do driver do Spark é compartilhado com vários usuários, portanto a configuração default salva o recurso no lado do driver do Spark.

Com o Ray 2.8.0 e o acima, o senhor pode configurar o recurso usado pelo nó principal do Ray. Use os seguintes argumentos na API setup_ray_cluster:

  • num_cpus_head_nodeNúcleos de CPU usados pelo nó principal do Ray

  • num_gpus_head_nodeConfiguração da GPU usada pelo nó de cabeça de raio

  • object_store_memory_head_nodeDefinição do tamanho da memória do armazenamento de objetos pelo Ray head node

Suporte para clusters heterogêneos

Para uma execução de treinamento mais eficiente e econômica, o senhor pode criar um Ray em clusters Spark e definir configurações diferentes entre o nó principal do Ray e os nós do Ray worker. No entanto, todos os nós do Ray worker devem ter a mesma configuração. Os clusters do Databricks não oferecem suporte total a clusters heterogêneos, mas é possível criar clusters do Databricks com diferentes tipos de driver e de instância do worker definindo uma política de cluster.

Por exemplo:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Ajuste a configuração clusters do Ray

A configuração recomendada para cada nó worker Ray é:

  • Mínimo de 4 núcleos de CPU por nó worker Ray.

  • Mínimo de 10 GB de memória heap para cada nó worker Ray.

Portanto, ao chamar ray.util.spark.setup_ray_cluster, o Databricks recomenda definir num_cpus_per_node para um valor >=4.

Consulte Alocação de memória para nós worker Ray para obter detalhes sobre como ajustar a memória heap para cada nó worker Ray.

Alocação de memória para nós de trabalho do Ray

Cada nó worker Ray usa dois tipos de memória: memória heap e memória de armazenamento de objeto. O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.

A memória total alocada para cada nó worker do Ray é:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES é o número máximo de nós worker Ray que podem ser ativados no nó worker Spark. Isso é determinado pelo argumento num_cpus_per_node ou num_gpus_per_node.

Se você não configurar o argumento object_store_memory_per_node, o tamanho da memória de heap e o tamanho da memória de armazenamento de objeto alocados para cada nó worker Ray serão:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Se você definir o argumento object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Além disso, o tamanho da memória de armazenamento de objeto por nó worker Ray é limitado pela memória compartilhada do sistema operacional. O valor máximo é:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY é o tamanho do disco /dev/shm configurado para o nó worker Spark.

Dicas para a configuração de clusters do Spark

O Ray clusters é executado em cima de um Databricks Spark clusters. Um cenário comum é usar o Spark Job e o Spark UDF para realizar tarefas simples de pré-processamento de dados que não precisam de recurso de GPU. Em seguida, use o Ray para executar tarefas machine learning complicadas que se beneficiam das GPUs. Nesse caso, a Databricks recomenda definir o parâmetro de configuração de nível de clusters do Spark spark.task.resource.gpu.amount como 0, de modo que todas as transformações do Spark DataFrame e execuções do Spark UDF não usem recurso de GPU.

Os benefícios dessa configuração são os seguintes:

  • Ele aumenta o paralelismo do Spark Job, porque o tipo de instância da GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.

  • Se os clusters do Spark forem compartilhados com vários usuários, essa configuração impedirá que o Spark Job concorra por recursos de GPU com cargas de trabalho Ray em execução simultânea.

Habilitar rastreamentos de pilha e gráficos de chama na página Ray Dashboard Actors

Na página Ray Dashboard Actors , você pode view rastreamentos de pilha e gráficos de chama para atores Ray ativos. Para view esta informação, use o seguinte comando para instalar o “py-spy” antes de iniciar os clusters Ray:

%pip install py-spy

Desligar um clustersRay

Para desligar clusters Ray em execução no Databricks, você pode chamar os clustersray.utils.spark.shutdown_ray_ API.

Observação

Os clusters Ray também são encerrados quando:

  • Você desanexa seu Notebook interativo de seus clusters Databricks.

  • Seu Job do Databricks é concluído.

  • Seus clusters Databricks são reiniciados ou encerrados.

  • Não há atividade durante o tempo de parada especificado.

Notebook de exemplo

O Notebook a seguir demonstra como criar clusters Ray e executar um aplicativo Ray no Databricks.

Notebook de iniciação Ray on Spark

Abra o bloco de anotações em outra guia

Limitações

  • Não há suporte para clusters multiusuário compartilhados do Databricks (modo de isolamento habilitado).

  • Ao usar %pip para instalar pacotes, os clusters Ray serão desligados. Certifique-se de começar Ray depois de terminar de instalar todas as suas bibliotecas com %pip.

  • O uso de integrações que substituem a configuração de ray.util.spark.setup_ray_cluster pode tornar os clusters do Ray instáveis e travar o contexto do Ray. Por exemplo, usar o pacote xgboost_ray e definir RayParams com um ator ou configuração cpus_per_actor em excesso da configuração clusters Ray pode travar silenciosamente os clusters Ray .