Serve a SparkML model(Python)

Loading...

Serve a SparkML model

This notebook trains a SparkML Pipeline and logs to MLflow for use in Model Serving (AWS | Azure).

Requirements

  • Databricks Runtime for Machine Learning 13.0 and above.
import numpy as np
import pandas as pd
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
import mlflow
mlflow.autolog(disable=True)
X1 = "x1"
X2 = "x2"
Y = "y"
ERROR = "error"

Create dataset (10,000 samples)

Make data of the formula:

y=β1x1+β2x2+errory = \beta_{1}x_1 + \beta_{2}x_2 + error

DATA_LEN = 10000
_beta1 = 9
_beta2 = 6
x1 = np.random.randn(DATA_LEN)
x2 = np.random.randn(DATA_LEN)
error = np.random.randn(DATA_LEN)

df = (
        pd.DataFrame({
            X1 : x1,
            X2 : x2,
            ERROR : error
        })
        .assign(y = lambda z : _beta1 * z[X1] + _beta2 * z[X2] + z[ERROR])
        .drop(columns=ERROR)
    )
df

When data is supplied to the model serving endpoint it's created as a pandas dataframe and moved into Spark with spark.createDataFrame. Therefore, it is important to train our ML model as a pipeline which includes inside the VectorAssembler so that the new data can be put into the proper form after being moved into Spark.

# Convert the pandas dataframe to spark
spark_df = spark.createDataFrame(df)
with mlflow.start_run() as run:
    # Create the vector assembler
    assembler = VectorAssembler(inputCols=[X1,X2], outputCol="features")
    # Create the linear regression
    lr = LinearRegression(featuresCol="features", labelCol=Y)
    # Put the vector assembler and the linear regression into a pipeline
    pipeline = Pipeline(stages=[assembler,lr])
    # Train the pipeline
    model = pipeline.fit(spark_df)
    mlflow.spark.log_model(model, "model", registered_model_name="spark_linear_regression")
2023/04/28 18:58:38 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model(). Successfully registered model 'spark_linear_regression'. 2023/04/28 18:59:24 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: spark_linear_regression, version 1 Created version '1' of model 'spark_linear_regression'.

After starting your model endpoint you can query it with an example data point using the following:

{
  "dataframe_records": 
  [
    {
      "x1": 5.1,
      "x2": 3.5
    }
  ]
}