import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id, expr, rand
import uuid
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
raw_data = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" ) def addIdColumn(dataframe, id_column_name): """Add id column to dataframe""" columns = dataframe.columns new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id()) return new_df[[id_column_name] + columns] def renameColumns(df): """Rename columns to be compatible with Feature Store""" renamed_df = df for column in df.columns: renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_')) return renamed_df # Run functions renamed_df = renameColumns(raw_data) df = addIdColumn(renamed_df, 'wine_id') # Drop target column ('quality') as it is not included in the feature table features_df = df.drop('quality') display(features_df)
Table
1,000 rows|Truncated data
fs.create_table( name=table_name, primary_keys=["wine_id"], df=features_df, schema=features_df.schema, description="wine features" )
2022/12/15 21:53:48 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.default.wine_db_a9239a'.
/databricks/.python_edge_libs/databricks/feature_store/entities/_feature_store_object.py:8: FutureWarning: ``databricks.feature_store.entities.feature_table.FeatureTable.keys`` is deprecated since v0.3.6. This method will be removed in a future release. Use ``FeatureTable.primary_keys`` instead.
yield prop, self.__getattribute__(prop)
Out[5]: <FeatureTable: keys=['wine_id'], tags={}>
def load_data(table_name, lookup_key): # In the FeatureLookup, if you do not provide the `feature_names` parameter, all features except primary keys are returned model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)] # fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id") training_pd = training_set.load_df().toPandas() # Create train and test datasets X = training_pd.drop("quality", axis=1) y = training_pd["quality"] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) return X_train, X_test, y_train, y_test, training_set # Create the train and test datasets X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id") X_train.head()
# Disable MLflow autologging and instead log the model using Feature Store mlflow.sklearn.autolog(log_models=False) def train_model(X_train, X_test, y_train, y_test, training_set, fs): ## fit and log model with mlflow.start_run() as run: rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42) rf.fit(X_train, y_train) y_pred = rf.predict(X_test) mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred)) mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred)) fs.log_model( model=rf, artifact_path="wine_quality_prediction", flavor=mlflow.sklearn, training_set=training_set, registered_model_name="wine_model", ) train_model(X_train, X_test, y_train, y_test, training_set, fs)
Successfully registered model 'wine_model'.
2022/12/15 21:53:58 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wine_model, version 1
Created version '1' of model 'wine_model'.
## For simplicity, this example uses inference_data_df as input data for prediction batch_input_df = inference_data_df.drop("quality") # Drop the label column predictions_df = fs.score_batch("models:/wine_model/latest", batch_input_df) display(predictions_df["wine_id", "prediction"])
2022/12/15 21:54:07 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.
Table
1,000 rows|Truncated data
def load_data(table_name, lookup_key): model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)] # fs.create_training_set will look up features in model_feature_lookups with matched key from inference_data_df training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id") training_pd = training_set.load_df().toPandas() # Create train and test datasets X = training_pd.drop("quality", axis=1) y = training_pd["quality"] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) return X_train, X_test, y_train, y_test, training_set X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id") X_train.head()
def train_model(X_train, X_test, y_train, y_test, training_set, fs): ## fit and log model with mlflow.start_run() as run: rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42) rf.fit(X_train, y_train) y_pred = rf.predict(X_test) mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred)) mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred)) fs.log_model( model=rf, artifact_path="feature-store-model", flavor=mlflow.sklearn, training_set=training_set, registered_model_name="wine_model", ) train_model(X_train, X_test, y_train, y_test, training_set, fs)
Registered model 'wine_model' already exists. Creating a new version of this model...
2022/12/15 21:54:39 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wine_model, version 2
Created version '2' of model 'wine_model'.
## For simplicity, this example uses inference_data_df as input data for prediction batch_input_df = inference_data_df.drop("quality") # Drop the label column predictions_df = fs.score_batch(f"models:/wine_model/latest", batch_input_df) display(predictions_df["wine_id","prediction"])
2022/12/15 21:54:48 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.
Table
1,000 rows|Truncated data
Basic feature store example
This notebook illustrates how you can use Databricks Feature Store to create, store, and manage features to train ML models and make batch predictions, including with features whose value is only available at the time of prediction. In this example, the goal is to predict the wine quality using a ML model with a variety of static wine features and a realtime input.
This notebook shows how to: