MLflow: Train PySpark Model and Log in MLeap Format(Python)
Loading...

MLflow Deployment: Train PySpark Model and Log in MLeap Format

This notebook walks through the process of:

  1. Training a PySpark pipeline model
  2. Saving the model in MLeap format with MLflow

The notebook contains the following sections:

Setup

  • Launch a Python 3 cluster
  • Install MLflow

Train a PySpark Pipeline model

  • Load pipeline training data
  • Define the PySpark Pipeline structure
  • Train the Pipeline model and log it within an MLflow run

Setup

  1. Ensure you are using or create a cluster specifying Python 3.
  2. If you are running Databricks Runtime, uncomment and run Cmd 4 to install mlflow. If you are using Databricks Runtime ML, you can skip this step as the required libraries are already installed.
  3. Create library with Source Maven Coordinate and the fully-qualified Maven artifact coordinate:
    • ml.combust.mleap:mleap-spark_2.11:0.13.0
  4. Install the libraries into the cluster.
  5. Attach this notebook to the cluster.
#dbutils.library.installPyPI("mlflow[extras]")
#dbutils.library.restartPython()

Train a PySpark Pipeline model

Load pipeline training data

Load data that will be used to train the PySpark Pipeline model. This model uses the 20 Newsgroups dataset which consists of articles from 20 Usenet newsgroups.

df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/training").select("text", "topic")
df.cache()
display(df)

Define the PySpark Pipeline structure

Define a PySpark Pipeline that featurizes samples from our dataset and classifies them using decision trees.

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Define pipeline components
labelIndexer = StringIndexer(inputCol="topic", outputCol="label", handleInvalid="keep")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
dt = DecisionTreeClassifier()
 
# Construct a Pipeline object using the defined components
pipeline = Pipeline(stages=[labelIndexer, tokenizer, hashingTF, dt])

Train the Pipeline model and log it within an MLflow run with MLeap flavor

Train the PySpark Pipeline on the 20 Newsgroups data that was loaded previously. The training process will execute within an MLflow run.

import mlflow
import mlflow.mleap
 
def fit_model():
  # Start a new MLflow run
  with mlflow.start_run() as run:
    # Fit the model, performing cross validation to improve accuracy
    paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [1000, 2000]).build()
    cv = CrossValidator(estimator=pipeline, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)
    cvModel = cv.fit(df)
    model = cvModel.bestModel
  
    # Log the model within the MLflow run
    mlflow.mleap.log_model(spark_model=model, sample_input=df, artifact_path="model")
# Train the PySpark Pipeline model within a new MLflow run
fit_model()