MLeap Model Export Demo (Scala)(Scala)

Model Export with MLeap

MLeap is a common serialization format and execution engine for machine learning pipelines. It supports Apache Spark, scikit-learn, and TensorFlow for training pipelines and exporting them to an MLeap Bundle. Serialized pipelines (bundles) can be deserialized back into Apache Spark, scikit-learn, TensorFlow graphs, or an MLeap pipeline. This notebook only demonstrates how to use MLeap to do the model export with MLlib. For an overview of the package and more examples, check out the MLeap documentation.

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.bundle.SparkBundleContext
import resource._

Cluster Setup

  1. Create a library with the Source Maven Coordinate and the fully-qualified Maven artifact coordinate: ml.combust.mleap:mleap-spark_2.11:0.13.0.
  2. Install the library into a cluster.

Note:

  • This version of MLeap works with Spark 2.0, 2.1, 2.2, 2.3, and 2.4.
  • It is cross-compiled only for Scala 2.11.

In this Notebook

This notebook demonstrates how to use MLeap to export a DecisionTreeClassifier from MLlib and how to load the saved PipelineModel to make predictions.

The basic workflow is as follows:

  • Model export
    • Fit a PipelineModel using MLlib.
    • Use MLeap to serialize the PipelineModel to zip File or to directory.
  • Move the PipelineModel files to your deployment project or data store.
  • In your project
    • Use MLeap to deserialize the saved PipelineModel.
    • Make predictions.

Training the Model by MLlib

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
val df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/training")
  .select("text", "topic")
df.cache()
display(df)
df.printSchema()

Define ML Pipeline

val labelIndexer = new StringIndexer()
  .setInputCol("topic")
  .setOutputCol("label")
  .setHandleInvalid("keep")
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features")
val dt = new DecisionTreeClassifier()
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, tokenizer, hashingTF, dt))

Tune ML Pipeline

val paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(1000, 2000))
  .build()
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new MulticlassClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid)
val cvModel = cv.fit(df)
val model = cvModel.bestModel.asInstanceOf[PipelineModel]
val sparkTransformed = model.transform(df)

display(sparkTransformed)

Export Trained Model with MLeap

MLeap supports serializing the model to one zip file or to files in one directory.

  • Serialize to a zip file, make sure the URI begins with jar:file and ends with a .zip.
  • Serialize to a directory, make sure the URI begins with file.

You can find the supported transformers for MLeap in the MLeap documentation.

%sh 
rm -rf /tmp/mleap_scala_model_export/
mkdir /tmp/mleap_scala_model_export/

Serialize to zip file:

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.bundle.SparkBundleContext
import resource._
implicit val context = SparkBundleContext().withDataset(sparkTransformed)
//save our pipeline to a zip file
//MLeap can save a file to any supported java.nio.FileSystem
(for(modelFile <- managed(BundleFile("jar:file:/tmp/mleap_scala_model_export/20news_pipeline-json.zip"))) yield {
  model.writeBundle.save(modelFile)(context)
}).tried.get

Serialize to directory:

(for(modelFile <- managed(BundleFile("file:/tmp/mleap_scala_model_export/20news_pipeline"))) yield {
  model.writeBundle.save(modelFile)(context)
}).tried.get
%sh ls /tmp/mleap_scala_model_export/

Download Model Files

In this example you download the model files from the browser. In general, you may want to programmatically move the model to a persistent storage layer.

dbutils.fs.cp("file:/tmp/mleap_scala_model_export/20news_pipeline-json.zip", "dbfs:/example/20news_pipeline-json.zip")
display(dbutils.fs.ls("dbfs:/example"))

Get a link to the downloadable zip at: https://<databricks-instance>/files/<file-name>.zip.

Import Trained Model with MLeap

This section shows how to use MLeap to load a trained model for use in your application. To use an existing ML models and pipelines to make predictions for new data, you can deserialize the model from the file you saved.

Import to Spark

This section shows how to load an MLeap bundle and make predictions on a Spark DataFrame. This can be useful if you want to use the same persistence format (bundle) for loading into Spark and non-Spark applications. If your goal is to make predictions only in Spark, then we recommend using MLlib's native ML persistence.

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import resource._

val zipBundle = (for(bundle <- managed(BundleFile("jar:file:/tmp/mleap_scala_model_export/20news_pipeline-json.zip"))) yield {
  bundle.loadSparkBundle().get
}).opt.get
val loadedModel = zipBundle.root

Use the loaded model to make predictions with the new data.

val test_df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/test")
  .select("text", "topic")
test_df.cache()
display(test_df)
val exampleResults = loadedModel.transform(test_df)

display(exampleResults)

Import to MLeap

This section demonstrates how to import your model into a non-Spark application. MLeap provides a LeapFrame API which is essentially a local DataFrame. After importing your model using MLeap, you can make predictions on data stored in a LeapFrame without using a SparkContext or SparkSession.

import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._

val zipBundleM = (for(bundle <- managed(BundleFile("jar:file:/tmp/mleap_scala_model_export/20news_pipeline-json.zip"))) yield {
  bundle.loadMleapBundle().get
}).opt.get
val mleapPipeline = zipBundleM.root

Set up a LeapFrame for testing, manually inserting 1 row of data.

import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
import ml.combust.mleap.core.types._

val schema = StructType(StructField("text", ScalarType.String),
  StructField("topic", ScalarType.String)).get
val data = Seq(Row("From: ecktons@ucs.byu.edu (Sean Eckton) Subject: Re: Why is my mouse so JUMPY?  (MS MOUSE) Organization: Fine Arts and Communications -- Brigham Young University Distribution: world  My original post: >Subject: Re: Why is my mouse so JUMPY?  (MS MOUSE) >> I have a Microsoft Serial Mouse and am using mouse.com 8.00 (was using 8.20  >> I think, but switched to 8.00 to see if it was any better).  Vertical motion  >> is nice and smooth, but horizontal motion is so bad I sometimes can't click  >> on something because my mouse jumps around.  I can be moving the mouse to  >> the right with relatively uniform motion and the mouse will move smoothly  >> for a bit, then jump to the right, then move smoothly for a bit then jump  >> again (maybe this time to the left about .5 inch!).  This is crazy!  I have  >> never had so much trouble with a mouse before.  Anyone have any solutions?    Aha, I think I found the problem and it isn't dirt!  Another guy here was  using a different kind of mouse and was using 640x400x16 video driver (the  default VGA for Windows).  He has an S3 LocalBus card like I do and when I  loaded the S3 video driver in Windows for him, his mouse became jumpy too.   Seems like it is the S3 driver!  Is there any newer one than version 1.4  that would solve this problem?  It is really bad.  I have to use the  keyboard instead sometimes!  The s3-w31.zip on cica is version 1.4 (which is  the same version that came with my card).   --- Sean Eckton Computer Support Representative College of Fine Arts and Communications  D-406 HFAC Brigham Young University Provo, UT  84602 (801)378-3292  hfac_csr@byu.edu ecktons@ucs.byu.edu ", "comp.os.ms-windows.misc"))
val frame = DefaultLeapFrame(schema, data)

Now you can use the loaded model to make predictions.

val frame2 = mleapPipeline.transform(frame).get
val data2 = frame2.dataset
// The prediction is stored in column with index 2:
frame2.schema.fields.zipWithIndex.foreach { case (field, idx) =>
  println(s"$idx $field")
}
// Get the prediction for Row 0
data2(0).getDouble(7)