MLflow: Train PySpark Model and Log in MLeap Format(Python)
Loading...

MLflow Deployment: Train PySpark Model and Log in MLeap Format

This notebook walks through the process of:

  1. Training a PySpark pipeline model
  2. Saving the model in MLeap format with MLflow

The notebook contains the following sections:

Setup

  • Launch a Python 3 cluster
  • Install MLflow

Train a PySpark Pipeline model

  • Load pipeline training data
  • Define the PySpark Pipeline structure
  • Train the Pipeline model and log it within an MLflow run

Setup

  1. Ensure you are using or create a cluster specifying Python 3.
  2. If you are running Databricks Runtime, uncomment and run Cmd 4 to install mlflow. If you are using Databricks Runtime ML, you can skip this step as the required libraries are already installed.
  3. Create library with Source Maven Coordinate and the fully-qualified Maven artifact coordinate:
    • ml.combust.mleap:mleap-spark_2.11:0.13.0
  4. Install the libraries into the cluster.
  5. Attach this notebook to the cluster.
#dbutils.library.installPyPI("mlflow[extras]")
#dbutils.library.restartPython()

Train a PySpark Pipeline model

Load pipeline training data

Load data that will be used to train the PySpark Pipeline model. This model uses the 20 Newsgroups dataset which consists of articles from 20 Usenet newsgroups.

df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/training").select("text", "topic")
df.cache()
display(df)

Define the PySpark Pipeline structure

Define a PySpark Pipeline that featurizes samples from our dataset and classifies them using decision trees.

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Define pipeline components
labelIndexer = StringIndexer(inputCol="topic", outputCol="label", handleInvalid="keep")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
dt = DecisionTreeClassifier()
 
# Construct a Pipeline object using the defined components
pipeline = Pipeline(stages=[labelIndexer, tokenizer, hashingTF, dt])