# Define pipeline components
labelIndexer = StringIndexer(inputCol="topic", outputCol="label", handleInvalid="keep")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
dt = DecisionTreeClassifier()
# Construct a Pipeline object using the defined components
pipeline = Pipeline(stages=[labelIndexer, tokenizer, hashingTF, dt])
import mlflow
import mlflow.mleap
def fit_model():
# Start a new MLflow run
with mlflow.start_run() as run:
# Fit the model, performing cross validation to improve accuracy
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [1000, 2000]).build()
cv = CrossValidator(estimator=pipeline, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)
cvModel = cv.fit(df)
model = cvModel.bestModel
# Log the model within the MLflow run
mlflow.mleap.log_model(spark_model=model, sample_input=df, artifact_path="model")
MLflow Deployment: Train PySpark Model and Log in MLeap Format
This notebook walks through the process of:
The notebook contains the following sections:
Setup
Train a PySpark Pipeline model
Last refresh: Never