Basic Feature Store Example(Python)

Loading...

Basic feature store example

This notebook illustrates how you can use Databricks Feature Store to create, store, and manage features to train ML models and make batch predictions, including with features whose value is only available at the time of prediction. In this example, the goal is to predict the wine quality using a ML model with a variety of static wine features and a realtime input.

This notebook shows how to:

  • Create a feature store table and use it to build a training dataset for a machine learning model.
  • Modify the feature table and use the updated table to create a new version of the model.
  • Use the Databricks feature store UI to determine how features relate to models.
  • Perform batch scoring using automatic feature lookup.
import pandas as pd
 
from pyspark.sql.functions import monotonically_increasing_id, expr, rand
import uuid
 
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup
 
import mlflow
import mlflow.sklearn
 
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score

Load dataset

The code in the following cell loads the dataset and does some minor data preparation: creates a unique ID for each observation and removes spaces from the column names. The unique ID column (wine_id) is the primary key of the feature table and is used to lookup features.

raw_data = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" )

def addIdColumn(dataframe, id_column_name):
    """Add id column to dataframe"""
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name] + columns]

def renameColumns(df):
    """Rename columns to be compatible with Feature Store"""
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
    return renamed_df

# Run functions
renamed_df = renameColumns(raw_data)
df = addIdColumn(renamed_df, 'wine_id')

# Drop target column ('quality') as it is not included in the feature table
features_df = df.drop('quality')
display(features_df)

 
wine_id
fixed_acidity
volatile_acidity
citric_acid
residual_sugar
chlorides
free_sulfur_dioxide
total_sulfur_dioxide
density
pH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
7.4
0.7
0
1.9
0.076
11
34
0.9978
3.51
1
7.8
0.88
0
2.6
0.098
25
67
0.9968
3.2
2
7.8
0.76
0.04
2.3
0.092
15
54
0.997
3.26
3
11.2
0.28
0.56
1.9
0.075
17
60
0.998
3.16
4
7.4
0.7
0
1.9
0.076
11
34
0.9978
3.51
5
7.4
0.66
0
1.8
0.075
13
40
0.9978
3.51
6
7.9
0.6
0.06
1.6
0.069
15
59
0.9964
3.3
7
7.3
0.65
0
1.2
0.065
15
21
0.9946
3.39
8
7.8
0.58
0.02
2
0.073
9
18
0.9968
3.36
9
7.5
0.5
0.36
6.1
0.071
17
102
0.9978
3.35
10
6.7
0.58
0.08
1.8
0.097
15
65
0.9959
3.28
11
7.5
0.5
0.36
6.1
0.071
17
102
0.9978
3.35
12
5.6
0.615
0
1.6
0.089
16
59
0.9943
3.58
13
7.8
0.61
0.29
1.6
0.114
9
29
0.9974
3.26
14
8.9
0.62
0.18
3.8
0.176
52
145
0.9986
3.16
15
8.9
0.62
0.19
3.9
0.17
51
148
0.9986
3.17
16
8.5
0.28
0.56
1.8
0.092
35
103
0.9969
3.3
1,000 rows|Truncated data

Create a new database

spark.sql(f"CREATE DATABASE IF NOT EXISTS wine_db")

# Create a unique table name for each run. This prevents errors if you run the notebook multiple times.
table_name = f"wine_db_" + str(uuid.uuid4())[:6]
print(table_name)
wine_db_a9239a

Create the feature table

The first step is to create a FeatureStoreClient.

fs = feature_store.FeatureStoreClient()

# You can get help in the notebook for feature store API functions:
# help(fs.<function_name>)

# For example:
# help(fs.create_table)

Create the feature table. For a complete API reference, see (AWS|Azure|GCP).

fs.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    df=features_df,
    schema=features_df.schema,
    description="wine features"
)
2022/12/15 21:53:48 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.default.wine_db_a9239a'. /databricks/.python_edge_libs/databricks/feature_store/entities/_feature_store_object.py:8: FutureWarning: ``databricks.feature_store.entities.feature_table.FeatureTable.keys`` is deprecated since v0.3.6. This method will be removed in a future release. Use ``FeatureTable.primary_keys`` instead. yield prop, self.__getattribute__(prop) Out[5]: <FeatureTable: keys=['wine_id'], tags={}>

You can also use create_table without providing a dataframe, and then later populate the feature table using fs.write_table. fs.write_table supports both overwrite and merge modes.

Example:

fs.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    schema=features_df.schema,
    description="wine features"
)

fs.write_table(
    name=table_name,
    df=features_df,
    mode="overwrite"
)

Feature Store UI

To view the Feature Store UI, you must be in the Machine Learning persona (AWS|Azure|GCP). To access the Feature Store UI, click the Feature Store icon on the left navigation bar:

The Databricks Feature Store UI shows a list of all feature tables in the workspace and also displays information about the tables including the creator, data sources, online stores, scheduled jobs that update the table, and the time the table was most recently updated.

Find the table you just created in the list. You can enter text into the Search box to search based on the name of a feature table, a feature, or a data source.

Click the table name to display details about the table. This page shows information about the table. The Producers section shows the name of the notebook that created the table and the last time the notebook was run. You can scroll down to the Features table for more details about the features in the table; an example is shown later in this notebook.

Train a model with feature store

The feature table does not include the prediction target. However, the training dataset needs the prediction target values. There may also be features that are not available until the time the model is used for inference.

This example uses the feature real_time_measurement to represent a characteristic of the wine that can only be observed at inference time. This feature is used in training and the feature value for a wine is provided at inference time.

## inference_data_df includes wine_id (primary key), quality (prediction target), and a real time feature
inference_data_df = df.select("wine_id", "quality", (10 * rand()).alias("real_time_measurement"))
display(inference_data_df)
 
wine_id
quality
real_time_measurement
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
5
3.5700859196572896
1
5
1.2346146332684382
2
5
9.58411948042034
3
6
1.5440994327989777
4
5
1.4882931255504428
5
5
5.505514415785928
6
5
2.245871682474169
7
7
3.449313794648219
8
7
6.719547209690106
9
5
9.328359578352947
10
5
0.7088750796171428
11
5
2.8374473924752976
12
5
3.9958976480461406
13
5
2.3129961468790183
14
5
3.19452747068397
15
5
3.2600764110487654
16
7
0.7451927158966354
1,000 rows|Truncated data

Use a FeatureLookup to build a training dataset that uses the specified lookup_key to lookup features from the feature table and the online feature real_time_measurement. If you do not specify the feature_names parameter, all features except the primary key are returned.

def load_data(table_name, lookup_key):
    # In the FeatureLookup, if you do not provide the `feature_names` parameter, all features except primary keys are returned
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]

    # fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()

    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set

# Create the train and test datasets
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()
from mlflow.tracking.client import MlflowClient

client = MlflowClient()

try:
    client.delete_registered_model("wine_model") # Delete the model if already created
except:
    None

The code in the next cell trains a scikit-learn RandomForestRegressor model and logs the model with the Feature Store.

The code starts an MLflow experiment to track training parameters and results. Note that model autologging is disabled (mlflow.sklearn.autolog(log_models=False)); this is because the model is logged using fs.log_model.

# Disable MLflow autologging and instead log the model using Feature Store
mlflow.sklearn.autolog(log_models=False)

def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:

        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)
 
        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))

        fs.log_model(
            model=rf,
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )

train_model(X_train, X_test, y_train, y_test, training_set, fs)
Successfully registered model 'wine_model'. 2022/12/15 21:53:58 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wine_model, version 1 Created version '1' of model 'wine_model'.

To view the logged model, navigate to the MLflow Experiments page for this notebook. To access the Experiments page, click the Experiments icon on the left navigation bar:

Find the notebook experiment in the list. It has the same name as the notebook, in this case, "Basic Feature Store Example".

Click the experiment name to display the experiment page. The packaged feature store model, created when you called fs.log_model appears in the Artifacts section of this page. You can use this model for batch scoring.

The model is also automatically registered in the Model Registry. The feature table in the Feature Store UI is also updated to show which features from the feature table were used to train the model.

Batch scoring

Use score_batch to apply a packaged feature store model to new data for inference. The input data only needs the primary key column wine_id and the realtime feature real_time_measurement. The model automatically looks up all of the other feature values from the feature store.

## For simplicity, this example uses inference_data_df as input data for prediction
batch_input_df = inference_data_df.drop("quality") # Drop the label column

predictions_df = fs.score_batch("models:/wine_model/latest", batch_input_df)
                                  
display(predictions_df["wine_id", "prediction"])
2022/12/15 21:54:07 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.
 
wine_id
prediction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
5.209341384824892
1
5.463040444836221
2
5.463040444836221
3
5.472774758308718
4
5.209341384824892
5
5.218869115709728
6
5.138934654737697
7
4.942007363826638
8
5.237243683413905
9
5.578680452146556
10
5.21617483700112
11
5.578680452146556
12
5.155432662453844
13
5.473321597433455
14
5.397479875764739
15
5.397479875764739
16
5.794114530705761
1,000 rows|Truncated data

Modify feature table

Suppose you modify the dataframe by adding a new feature. You can use fs.write_table with mode="merge" to update the feature table.

## Modify the dataframe containing the features
so2_cols = ["free_sulfur_dioxide", "total_sulfur_dioxide"]
new_features_df = (features_df.withColumn("average_so2", expr("+".join(so2_cols)) / 2))

display(new_features_df)
 
wine_id
fixed_acidity
volatile_acidity
citric_acid
residual_sugar
chlorides
free_sulfur_dioxide
total_sulfur_dioxide
density
pH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
7.4
0.7
0
1.9
0.076
11
34
0.9978
3.51
1
7.8
0.88
0
2.6
0.098
25
67
0.9968
3.2
2
7.8
0.76
0.04
2.3
0.092
15
54
0.997
3.26
3
11.2
0.28
0.56
1.9
0.075
17
60
0.998
3.16
4
7.4
0.7
0
1.9
0.076
11
34
0.9978
3.51
5
7.4
0.66
0
1.8
0.075
13
40
0.9978
3.51
6
7.9
0.6
0.06
1.6
0.069
15
59
0.9964
3.3
7
7.3
0.65
0
1.2
0.065
15
21
0.9946
3.39
8
7.8
0.58
0.02
2
0.073
9
18
0.9968
3.36
9
7.5
0.5
0.36
6.1
0.071
17
102
0.9978
3.35
10
6.7
0.58
0.08
1.8
0.097
15
65
0.9959
3.28
11
7.5
0.5
0.36
6.1
0.071
17
102
0.9978
3.35
12
5.6
0.615
0
1.6
0.089
16
59
0.9943
3.58
13
7.8
0.61
0.29
1.6
0.114
9
29
0.9974
3.26
14
8.9
0.62
0.18
3.8
0.176
52
145
0.9986
3.16
15
8.9
0.62
0.19
3.9
0.17
51
148
0.9986
3.17
16
8.5
0.28
0.56
1.8
0.092
35
103
0.9969
3.3
1,000 rows|Truncated data

Update the feature table using fs.write_table with mode="merge".

fs.write_table(
    name=table_name,
    df=new_features_df,
    mode="merge"
)

Explore feature lineage in the UI

After you update the feature table, the UI reflects the following:

  • The new feature appears in the feature list.
  • Deleted features still appear. When you read in the feature table, the deleted features have null values.
  • The Models column displays model versions that use each feature.
  • The Notebooks column displays notebooks that use each feature.

To read feature data from the Feature Store, use fs.read_table().

# Displays most recent version of the feature table
# Note that features that were deleted in the current version still appear in the table but with value = null.
display(fs.read_table(name=table_name))
 
wine_id
fixed_acidity
volatile_acidity
citric_acid
residual_sugar
chlorides
free_sulfur_dioxide
total_sulfur_dioxide
density
pH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
7.4
0.7
0
1.9
0.076
11
34
0.9978
3.51
1
7.8
0.88
0
2.6
0.098
25
67
0.9968
3.2
2
7.8
0.76
0.04
2.3
0.092
15
54
0.997
3.26
3
11.2
0.28
0.56
1.9
0.075
17
60
0.998
3.16
4
7.4
0.7
0
1.9
0.076
11
34
0.9978
3.51
5
7.4
0.66
0
1.8
0.075
13
40
0.9978
3.51
6
7.9
0.6
0.06
1.6
0.069
15
59
0.9964
3.3
7
7.3
0.65
0
1.2
0.065
15
21
0.9946
3.39
8
7.8
0.58
0.02
2
0.073
9
18
0.9968
3.36
9
7.5
0.5
0.36
6.1
0.071
17
102
0.9978
3.35
10
6.7
0.58
0.08
1.8
0.097
15
65
0.9959
3.28
11
7.5
0.5
0.36
6.1
0.071
17
102
0.9978
3.35
12
5.6
0.615
0
1.6
0.089
16
59
0.9943
3.58
13
7.8
0.61
0.29
1.6
0.114
9
29
0.9974
3.26
14
8.9
0.62
0.18
3.8
0.176
52
145
0.9986
3.16
15
8.9
0.62
0.19
3.9
0.17
51
148
0.9986
3.17
16
8.5
0.28
0.56
1.8
0.092
35
103
0.9969
3.3
1,000 rows|Truncated data

Train a new model version using the updated feature table

def load_data(table_name, lookup_key):
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
    
    # fs.create_training_set will look up features in model_feature_lookups with matched key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()

    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set

X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()

Build a training dataset that will use the indicated key to lookup features.

def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:

        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)

        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))

        fs.log_model(
            model=rf,
            artifact_path="feature-store-model",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )

train_model(X_train, X_test, y_train, y_test, training_set, fs)
Registered model 'wine_model' already exists. Creating a new version of this model... 2022/12/15 21:54:39 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wine_model, version 2 Created version '2' of model 'wine_model'.

Apply the latest version of the feature store registered MLflow model to features using score_batch.

## For simplicity, this example uses inference_data_df as input data for prediction
batch_input_df = inference_data_df.drop("quality") # Drop the label column
predictions_df = fs.score_batch(f"models:/wine_model/latest", batch_input_df)
display(predictions_df["wine_id","prediction"])
2022/12/15 21:54:48 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.
 
wine_id
prediction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
5.209341384824892
1
5.452837398542869
2
5.502254426489755
3
5.472591947615569
4
5.209341384824892
5
5.218869115709728
6
5.138934654737697
7
5.155432662453844
8
5.237243683413905
9
5.568477405853204
10
5.21617483700112
11
5.568477405853204
12
5.155432662453844
13
5.490180842166268
14
5.387276829471387
15
5.387276829471387
16
5.758849603224289
1,000 rows|Truncated data

The UI now reflects that version 2 of wine_model uses the newly created feature average_so2.

Control permissions for and delete feature tables

  • To control who has access to a feature table, use the Permissions button in the UI.
  • To delete a feature table, click the kebab menu next to Permissions in the UI and select Delete. When you delete a feature table using the UI, the corresponding Delta table is not deleted; you must delete that table separately.