Save ML model to Snowflake(Python)
Loading...

Configure Snowflake connection options.

# Use secrets DBUtil to get Snowflake credentials.
user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")

options = {
  "sfUrl": "<snowflake-url>",
  "sfUser": user,
  "sfPassword": password,
  "sfDatabase": "<snowflake-database>",
  "sfSchema": "<snowflake-schema>",
  "sfWarehouse": "<snowflake-cluster>",
}
%scala
val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")

val options = Map(
  "sfUrl" -> "<snowflake-url>",
  "sfUser" -> user,
  "sfPassword" -> password,
  "sfDatabase" -> "<snowflake-database>",
  "sfSchema" -> "<snowflake-schema>",
  "sfWarehouse" -> "<snowflake-cluster>"
)
%scala
import net.snowflake.spark.snowflake.Utils

Utils.runQuery(options, """CREATE SCHEMA IF NOT EXISTS <snowflake-database>""")
Utils.runQuery(options, """DROP TABLE IF EXISTS adult""")
Utils.runQuery(options, """CREATE TABLE adult (
  age DOUBLE,
  workclass STRING,
  fnlwgt DOUBLE,
  education STRING,
  education_num DOUBLE,
  marital_status STRING,
  occupation STRING,
  relationship STRING,
  race STRING,
  sex STRING,
  capital_gain DOUBLE,
  capital_loss DOUBLE,
  hours_per_week DOUBLE,
  native_country STRING,
  income STRING)""")
%scala
spark.read.format("csv")
  .option("header", "true")
  .load("/databricks-datasets/adult/adult.data")
  .write.format("snowflake")
  .options(options).mode("append").option("dbtable", "adult").save()

Load and display dataset.

dataset = spark.read.format("snowflake").options(**options).option("dbtable", "adult").load()
cols = dataset.columns
display(dataset)

Create machine learning training pipeline.

import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from distutils.version import LooseVersion

categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

Configure training pipeline.

# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

Randomly split data into training and test sets and set seed for reproducibility.

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

Random forests use an ensemble of trees to improve model accuracy. You can read more about random forests in the classification and regression section of the MLlib Programming Guide.

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
selected = predictions.select("LABEL", "PREDICTION", "PROBABILITY", "AGE")
display(selected)

Evaluate the random forest model with BinaryClassificationEvaluator.

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Tune the model with the ParamGridBuilder and the CrossValidator.

Because there are 3 values for maxDepth, 2 values for maxBin, and 2 values for numTrees, this grid has 3 x 2 x 2 = 12 parameter settings for CrossValidator to choose from.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

Create a 5-fold CrossValidator.

cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(trainingData)
predictions = cvModel.transform(testData)
evaluator.evaluate(predictions)
selected = predictions.select("LABEL", "PREDICTION", "PROBABILITY", "AGE")
display(selected)

Because a random forest gives you the best areaUnderROC value, use the bestModel obtained from the random forest for deployment and use it to generate predictions on new data. This example runs a simulation by generating predictions on the entire dataset.

bestModel = cvModel.bestModel

Generate predictions for entire dataset.

finalPredictions = bestModel.transform(dataset)

Evaluate the best model.

evaluator.evaluate(finalPredictions)
%scala

import org.apache.spark.ml.linalg.{DenseVector, Vector}
val VectorToArray: Vector => Array[Double] = _.asInstanceOf[DenseVector].toArray
spark.udf.register("VectorToArray", VectorToArray)
display(finalPredictions.drop("features").drop("rawPrediction").selectExpr("*", "VectorToArray(probability)[0] as prob_0", "VectorToArray(probability)[1] as prob_1"))

Save final results to Snowflake.

finalPredictions \
  .drop("features") \
  .drop("rawPrediction") \
  .selectExpr("*", "VectorToArray(probability)[0] as prob_0", "VectorToArray(probability)[1] as prob_1") \
  .drop("probability") \
  .write.format("snowflake") \
  .options(**options) \
  .option("dbtable", "adult_results") \
  .mode("overwrite") \
  .save()