Pular para o conteúdo principal

Realize a inferência de lotes usando o site Spark DataFrame

Este artigo descreve como realizar a inferência de lotes em um Spark DataFrame usando um modelo registrado em Databricks. O fluxo de trabalho se aplica a vários modelos de aprendizagem profunda e aprendizado de máquina, incluindo TensorFlow, PyTorch e scikit-learn. Ele inclui práticas recomendadas para carregamento de dados, inferência de modelos e ajuste de desempenho.

Para inferência de modelo para aplicativos de aprendizagem profunda, a Databricks recomenda o seguinte fluxo de trabalho. Para obter exemplos de Notebook que usam TensorFlow e PyTorch, consulte exemplos de inferência de lotes.

Inferência de modelos fluxo de trabalho

Databricks recomenda o seguinte fluxo de trabalho para realizar a inferência de lotes usando Spark DataFrames.

Etapa 1: Configuração do ambiente

Certifique-se de que a execução do cluster seja compatível com a versão Databricks ML Runtime para corresponder ao ambiente de treinamento. O modelo, registrado em MLflow, contém os requisitos que podem ser instalados para garantir a correspondência entre os ambientes de treinamento e de inferência.

Python
requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

Etapa 2: Carregar dados no Spark DataFrames

Dependendo do tipo de dados, use o método apropriado para carregar dados em um Spark DataFrame:

Tipo de dados

Método

Tabela do site Unity Catalog (recomendado)

table = spark.table(input_table_name)

Arquivos de imagem (JPG, PNG)

files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])

Registros TF

df = spark.read.format("tfrecords").load(image_path)

Outros formatos (Parquet, CSV, JSON, JDBC)

Carregar usando Spark fonte de dados.

Etapa 3: Carregar modelo do registro de modelo

Este exemplo usa um modelo do Databricks Model Registry para inferência.

Python
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

Etapa 4: Realizar a inferência do modelo usando UDFs do Pandas

Pandas Os UDFs utilizam o Apache Arrow para transferência eficiente de dados e o Pandas para processamento. As etapas típicas da inferência com os UDFs do site Pandas são:

  1. Carregue o modelo treinado: Use o MLflow para criar um Spark UDF para inferência.
  2. Pré-processar dados de entrada: certifique-se de que o esquema de entrada corresponda aos requisitos do modelo.
  3. execução do modelo de previsão: Use a função UDF do modelo no site DataFrame.
Python
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (Recomendado) Salve as previsões no Unity Catalog.

O exemplo a seguir salva as previsões no Unity Catalog.

Python
df_result.write.mode("overwrite").saveAsTable(output_table)

Ajuste de desempenho para inferência de modelos

Esta seção fornece algumas dicas para depuração e ajuste de desempenho para inferência de modelos em Databricks. Para obter uma visão geral, consulte a seção Realizar inferência de lotes usando o site Spark DataFrame .

Normalmente, há duas partes principais na inferência de modelos: pipeline de entrada de dados e inferência de modelos. O pipeline de entrada de dados é pesado em termos de entrada de E/S de dados e a inferência de modelos é pesada em termos de computação. Determinar o gargalo do fluxo de trabalho é simples. Aqui estão algumas abordagens:

  • Reduza o modelo para um modelo trivial e meça os exemplos por segundo. Se a diferença do tempo de ponta a ponta entre o modelo completo e o modelo trivial for mínima, então o pipeline de entrada de dados provavelmente é um gargalo; caso contrário, a inferência do modelo é o gargalo.
  • Se estiver executando a inferência do modelo com a GPU, verifique as métricas de utilização da GPU. Se a utilização da GPU não for continuamente alta, o pipeline de entrada de dados poderá ser o gargalo.

Otimizar o pipeline de entrada de dados

O uso de GPUs pode otimizar com eficiência a velocidade de execução para inferência de modelos. À medida que as GPUs e outros aceleradores se tornam mais rápidos, é importante que o pipeline de entrada de dados acompanhe a demanda. O pipeline de entrada de dados lê os dados no Spark DataFrames, transforma-os e carrega-os como a entrada para a inferência do modelo. Se a entrada de dados for o gargalo, aqui estão algumas dicas para aumentar a taxa de transferência de E/S:

  • Defina o máximo de registros por lote. Um número maior de registros máximos pode reduzir a sobrecarga de E/S para chamar a função UDF, desde que os registros caibam na memória. Para definir o tamanho do lote, defina a seguinte configuração:

    Python
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
  • Carregar os dados em lotes e buscá-los previamente ao pré-processar os dados de entrada no site Pandas UDF.

    Para o TensorFlow, a Databricks recomenda o uso da API tf.data. O senhor pode analisar o mapa em paralelo definindo num_parallel_calls em uma função map e chamando prefetch e batch para pré-busca e lotes.

    Python
    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)

    Para o PyTorch, a Databricks recomenda o uso da classe DataLoader. O senhor pode definir batch_size para lotes e num_workers para carregamento paralelo de dados.

    Python
    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)

exemplos de inferência de lotes

Os exemplos desta seção seguem o fluxo de trabalho de inferência de aprendizagem profunda recomendado. Esses exemplos ilustram como realizar a inferência do modelo usando um modelo de rede neural de redes residuais profundas (ResNets) pré-treinado.