Inferência LLM distribuída com dados Ray e vLLM em GPU sem servidor
Este notebook demonstra como executar inferência de modelos de linguagem de grande porte (LLM) em escala usando Ray Data e vLLM em uma GPU sem servidor Databricks . Ele utiliza a APIde GPU distribuída sem servidor para provisionar e gerenciar automaticamente GPUs A10 em vários nós para inferência distribuída.
O que você aprenderá:
- Configure o Ray e o vLLM para inferência LLM distribuída usando
map_batches - Utilize o Ray Data para agrupar e processar solicitações de forma eficiente em várias GPUs.
- Salvar resultados de inferência em Parquet nos volumes Unity Catalog
- Converter tabelas Parquet em tabelas Delta para governança e consultas eficientes.
- Monitorar cluster de recursos Ray e a alocação de GPU
Caso de uso: inferência em lotes com milhares de solicitações, utilizando GPUs de forma eficiente, armazenamento persistente e integração Delta Lake .
Conecte-se à computeGPU serverless
- Clique no dropdown "Conectar" na parte superior.
- Selecione GPU sem servidor .
- Abra o painel lateral Ambiente , localizado no lado direito do Notebook.
- Configure o acelerador para A10 para esta demonstração.
- Clique em Aplicar e Confirmar para aplicar este ambiente ao seu Notebook.
Nota: A função distribuída iniciará GPUs A10 remotas para inferência em vários nós. Execução do próprio Notebook em um único A10 para orquestração.
Instalar dependências
Instale todos os pacotes necessários para inferência distribuída de Ray e vLLM:
- Flash Attention : Atenção otimizada para inferência mais rápida (compatível com CUDA 12, PyTorch 2.6 e A10)
- vLLM : mecanismo de inferência LLM de alta taxa de transferência
- Ray Data : Processamento de dados distribuído com suporte a LLM (API
ray.data.llm) - Transformers : Hugging Face carregamento de modelos - utilidades
%pip install --force-reinstall --no-cache-dir --no-deps "https://github.com/Dao-AILab/flash-attention/releases/download/v2.7.4.post1/flash_attn-2.7.4.post1+cu12torch2.6cxx11abiFALSE-cp312-cp312-linux_x86_64.whl"
%pip install "transformers<4.54.0"
%pip install "vllm==0.8.5.post1"
%pip install "ray[data]>=2.47.1" # Required for ray.data.llm API
%pip install "opentelemetry-exporter-prometheus"
%pip install "optree>=0.13.0"
%pip install hf_transfer
%pip install "numpy==1.26.4"
%restart_python
Verificar versões do pacote
Confirme se todos os pacotes necessários estão instalados em versões compatíveis.
from packaging.version import Version
import torch
import flash_attn
import vllm
import ray
import transformers
print(f"PyTorch: {torch.__version__}")
print(f"Flash Attention: {flash_attn.__version__}")
print(f"vLLM: {vllm.__version__}")
print(f"Ray: {ray.__version__}")
print(f"Transformers: {transformers.__version__}")
assert Version(ray.__version__) >= Version("2.47.1"), "Ray version must be at least 2.47.1"
print("\n✓ All version checks passed!")
Configuração
Utilize widgets para configurar parâmetros de inferência e autenticação opcional do Hugging Face.
Nota de segurança: Armazene seus tokens Hugging Face em Segredos Databricks para uso em produção. Consulte a documentação do Databricks Secrets.
# Widget configuration
dbutils.widgets.text("hf_secret_scope", "")
dbutils.widgets.text("hf_secret_key", "")
dbutils.widgets.text("model_name", "Qwen/Qwen3-4B-Instruct-2507")
dbutils.widgets.text("num_gpus", "5")
dbutils.widgets.text("num_prompts", "1000")
# Unity Catalog configuration for output storage
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "ray_data")
dbutils.widgets.text("uc_table", "llm_inference_results")
# Retrieve widget values
HF_SECRET_SCOPE = dbutils.widgets.get("hf_secret_scope")
HF_SECRET_KEY = dbutils.widgets.get("hf_secret_key")
MODEL_NAME = dbutils.widgets.get("model_name")
NUM_GPUS = int(dbutils.widgets.get("num_gpus"))
NUM_PROMPTS = int(dbutils.widgets.get("num_prompts"))
# Unity Catalog paths
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
UC_TABLE = dbutils.widgets.get("uc_table")
# Construct paths
UC_VOLUME_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}"
UC_TABLE_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.{UC_TABLE}"
PARQUET_OUTPUT_PATH = f"{UC_VOLUME_PATH}/inference_output"
print(f"Model: {MODEL_NAME}")
print(f"Number of GPUs: {NUM_GPUS}")
print(f"Number of prompts: {NUM_PROMPTS}")
print(f"\nUnity Catalog Configuration:")
print(f" Volume Path: {UC_VOLUME_PATH}")
print(f" Table Name: {UC_TABLE_NAME}")
print(f" Parquet Output: {PARQUET_OUTPUT_PATH}")
Autentique com Hugging Face (opcional)
Se estiver usando modelos com acesso restrito (como o Llama), autentique-se com o Hugging Face.
Opção 1: Usar Segredos do Databricks (recomendado para produção)
hf_token = dbutils.secrets.get(scope=HF_SECRET_SCOPE, key=HF_SECRET_KEY)
Opção 2: Login interativo (para desenvolvimento)
from huggingface_hub import login
# Uncomment ONE of the following options:
# Option 1: Use Databricks Secrets (recommended)
# if HF_SECRET_SCOPE and HF_SECRET_KEY:
# hf_token = dbutils.secrets.get(scope=HF_SECRET_SCOPE, key=HF_SECRET_KEY)
# login(token=hf_token)
# print("✓ Logged in using Databricks Secrets")
# Option 2: Interactive login
login()
print("✓ Hugging Face authentication complete")
Ray cluster recurso monitoramento
A função utilidades serve para inspecionar os recursos cluster Ray e verificar a alocação de GPUs entre os nós.
import json
import ray
def print_ray_resources():
"""Print Ray cluster resources and GPU allocation per node."""
try:
cluster_resources = ray.cluster_resources()
print("Ray Cluster Resources:")
print(json.dumps(cluster_resources, indent=2))
nodes = ray.nodes()
print(f"\nDetected {len(nodes)} Ray node(s):")
for node in nodes:
node_id = node.get("NodeID", "N/A")[:8] # Truncate for readability
ip_address = node.get("NodeManagerAddress", "N/A")
resources = node.get("Resources", {})
num_gpus = int(resources.get("GPU", 0))
print(f" • Node {node_id}... | IP: {ip_address} | GPUs: {num_gpus}")
# Show specific GPU IDs if available
gpu_ids = [k for k in resources.keys() if k.startswith("GPU_ID_")]
if gpu_ids:
print(f" GPU IDs: {', '.join(gpu_ids)}")
except Exception as e:
print(f"Error querying Ray cluster: {e}")
# Display current resources
# print_ray_resources()
Defina a tarefa de inferência distribuída.
A classe LLMPredictor encapsula o vLLM para inferência de lotes eficiente. O Ray Data distribui a carga de trabalho entre vários trabalhadores de GPU usando map_batches.
Visão geral da arquitetura
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Ray Data │───▶│ LLMPredictor │───▶│ Parquet │
│ (Prompts) │ │ (vLLM Engine) │ │ (UC Volume) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
Distributed GPU Workers (A10) Delta Table
across nodes with vLLM instances (UC Table)
from serverless_gpu.ray import ray_launch
import os
# Set Ray temp directory
os.environ['RAY_TEMP_DIR'] = '/tmp/ray'
@ray_launch(gpus=NUM_GPUS, gpu_type='a10', remote=True)
def run_distributed_inference():
"""Run distributed LLM inference using Ray Data and vLLM with map_batches."""
from typing import Dict, List
from datetime import datetime
import numpy as np
import ray
from vllm import LLM, SamplingParams
# Sample prompts for inference
base_prompts = [
"Hello, my name is",
"The president of the United States is",
"The future of AI is",
]
# Scale up prompts for distributed processing
prompts = base_prompts * (NUM_PROMPTS // len(base_prompts))
ds = ray.data.from_items(prompts)
print(f"✓ Created Ray dataset with {ds.count()} prompts")
# Sampling parameters for text generation
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=100
)
class LLMPredictor:
"""vLLM-based predictor for batch inference."""
def __init__(self):
self.llm = LLM(
model=MODEL_NAME,
tensor_parallel_size=1,
dtype="bfloat16",
trust_remote_code=True,
gpu_memory_utilization=0.90,
max_model_len=8192,
enable_prefix_caching=True,
enable_chunked_prefill=True,
max_num_batched_tokens=8192,
)
self.model_name = MODEL_NAME
print(f"✓ vLLM engine initialized with model: {MODEL_NAME}")
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
"""Process a batch of prompts."""
outputs = self.llm.generate(batch["item"], sampling_params)
prompt_list: List[str] = []
generated_text_list: List[str] = []
model_list: List[str] = []
timestamp_list: List[str] = []
for output in outputs:
prompt_list.append(output.prompt)
generated_text_list.append(
' '.join([o.text for o in output.outputs])
)
model_list.append(self.model_name)
timestamp_list.append(datetime.now().isoformat())
return {
"prompt": prompt_list,
"generated_text": generated_text_list,
"model": model_list,
"timestamp": timestamp_list,
}
# Configure number of parallel vLLM instances
num_instances = NUM_GPUS
# Apply the predictor across the dataset using map_batches
ds = ds.map_batches(
LLMPredictor,
concurrency=num_instances,
batch_size=32,
num_gpus=1,
num_cpus=12
)
# =========================================================================
# Write results to Parquet (stored in Unity Catalog Volume)
# =========================================================================
print(f"\n📦 Writing results to Parquet: {PARQUET_OUTPUT_PATH}")
ds.write_parquet(PARQUET_OUTPUT_PATH, mode="overwrite")
print(f"✓ Parquet files written successfully")
# Collect sample outputs for display
sample_outputs = ray.data.read_parquet(PARQUET_OUTPUT_PATH).take(limit=10)
print("\n" + "="*60)
print("SAMPLE INFERENCE RESULTS")
print("="*60 + "\n")
for i, output in enumerate(sample_outputs):
prompt = output.get("prompt", "N/A")
generated_text = output.get("generated_text", "")
display_text = generated_text[:100] if generated_text else "N/A"
print(f"[{i+1}] Prompt: {prompt!r}")
print(f" Generated: {display_text!r}...\n")
return PARQUET_OUTPUT_PATH
inferência distribuída de execução
Inicie a tarefa de inferência distribuída em várias GPUs A10. Isto irá:
- provisionamento trabalhador GPU A10 remoto
- Inicialize os mecanismos vLLM em cada worker.
- Distribua lembretes entre os trabalhadores usando dados Ray.
- Coletar e retornar os resultados gerados.
Observação: A startup pode levar alguns minutos, pois os nós da GPU estão sendo provisionados e os modelos carregados.
result = run_distributed_inference.distributed()
parquet_path = result[0] if NUM_GPUS > 1 else result
print(f"\n✓ Inference complete! Results saved to: {parquet_path}")
Carregar Parquet e visualizar resultados
Carregue a saída Parquet dos Volumes Unity Catalog usando Spark e visualize os resultados da inferência.
# Load Parquet data using Spark
print(f"📖 Loading Parquet from: {PARQUET_OUTPUT_PATH}")
df_spark = spark.read.parquet(PARQUET_OUTPUT_PATH)
# Show schema and row count
print(f"\n✓ Loaded {df_spark.count()} rows")
print("\nSchema:")
df_spark.printSchema()
# Display sample rows
print("\nSample Results:")
display(df_spark.limit(10))
Salvar como tabela Delta no Unity Catalog
Escreva os resultados da inferência em uma tabela Delta Unity Catalog para:
- Governança : Rastrear a linhagem de dados e os controles de acesso.
- Desempenho : Consultas otimizadas com Delta Lake
- Versionamento : viagem do tempo e auditoria história
# Write to Unity Catalog Delta table
print(f"💾 Writing to Delta table: {UC_TABLE_NAME}")
# Write the DataFrame as a Delta table (overwrite mode)
df_spark.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(UC_TABLE_NAME)
print(f"✓ Delta table created successfully: {UC_TABLE_NAME}")
Consultar a tabela Delta
Verifique se a tabela Delta foi criada e consulte-a usando SQL.
# Query the Delta table using SQL
print(f"📊 Querying Delta table: {UC_TABLE_NAME}\n")
# Get table info
display(spark.sql(f"DESCRIBE TABLE {UC_TABLE_NAME}"))
# Query sample results
print("\nSample Results from Delta Table:")
display(spark.sql(f"""
SELECT
prompt,
generated_text,
model,
timestamp
FROM {UC_TABLE_NAME}
LIMIT 10
"""))
# Get row count and verify correctness
row_count = spark.sql(f"SELECT COUNT(*) as count FROM {UC_TABLE_NAME}").collect()[0]["count"]
print(f"\n✓ Total rows in Delta table: {row_count}")
# Assert expected row count (NUM_PROMPTS should result in 999 rows: 1000 // 3 * 3 = 999)
expected_rows = (NUM_PROMPTS // 3) * 3 # Rounds down to nearest multiple of 3 base prompts
assert row_count == expected_rows, f"Expected {expected_rows} rows, but got {row_count}"
Próximos passos
Você executou com sucesso a inferência LLM distribuída usando Ray Data e vLLM em uma GPU sem servidor Databricks e salvou os resultados em uma tabela Delta !
O que você realizou
- ✅ Executou inferência LLM distribuída em várias GPUs A10
- ✅ Utilizou-se
map_batchescom uma classeLLMPredictorpersonalizada para processamento de lotes - ✅ Resultados salvos em Parquet nos volumes Unity Catalog
- ✅ Converti Parquet em uma tabela Delta controlada
Opções de personalização
- Alterar o modelo : Atualizar o widget
model_namepara usar modelos diferentes do Hugging Face. - escala up : Aumente
num_gpuspara Taxa de transferência mais alta - Ajuste o tamanho dos lotes : Modifique
batch_sizeemmap_batches()com base nas suas restrições de memória. - Geração de ajuste : Ajuste
SamplingParamspara diferentes características de saída - Modo de anexação : Altere a gravação Delta para
mode("append")para atualizações incrementais.
recurso
- Documentação API de GPU sem servidor
- Documentação de dados Ray
- Documentação do vLLM
- Documentação Unity Catalog
Limpar
Os recursos da GPU são limpos automaticamente quando o notebook é desconectado. Para desconectar manualmente:
- Clique em Conectado no dropdown compute
- Passe o cursor sobre "sem servidor"
- Selecione "Encerrar" no menu dropdown .