feature-store-online-example-dynamodb(Python)

Loading...

Online Feature Store example notebook

This notebook illustrates the use of Databricks Feature Store to publish features to an online store for real-time serving and automated feature lookup. The problem is to predict the wine quality using a ML model with a variety of static wine features and a realtime input.

This notebook creates an endpoint to predict the quality of a bottle of wine, given an ID and the realtime feature alcohol by volume (ABV).

The notebook is structured as follows:

  1. Prepare the feature table
  2. Set up DynamoDB
    • This notebook uses DynamoDB. For a list of supported online stores, see the Databricks documentation (AWS|Azure).
  3. Publish the features to online feature store
  4. Train and deploy the model
  5. Serve realtime queries with automatic feature lookup
  6. Clean up

Data Set

This example uses the Wine Quality Data Set.

Requirements

  • Databricks Runtime 10.4 LTS for Machine Learning or above
  • Access to AWS DynamoDB
    • This notebook uses DynamoDB as the online store and guides you through how to generate secrets and register them with Databricks Secret Management.

Prepare the feature table

Suppose you need to build an endpoint to predict wine quality with just the wine_id. There has to be a feature table saved in Feature Store where the endpoint can look up features of the wine by the wine_id. For the purpose of this demo, we need to prepare this feature table ourselves first. The steps are:

  1. Load and clean the raw data.
  2. Separate features and labels.
  3. Save features into a feature table.

Load and clean the raw data

The raw data contains 12 columns including 11 features and the quality column. The quality column is an integer that ranges from 3 to 8. The goal is to build a model that predicts the quality value.

raw_data_frame = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" )
display(raw_data_frame.limit(10))
# Have a look at the size of the raw data.
raw_data_frame.toPandas().shape

There are some problems with the raw data:

  1. The column names contain space (' '), which is not compatible with Feature Store.
  2. We need to add ID to the raw data so they can be looked up later by Feature Store.

The following cell addresses these issues.

from sklearn.preprocessing import MinMaxScaler
from pyspark.sql.functions import monotonically_increasing_id


def addIdColumn(dataframe, id_column_name):
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name] + columns]


def renameColumns(df):
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
    return renamed_df


# Rename columns so that they are compatible with Feature Store
renamed_df = renameColumns(raw_data_frame)

# Add id column
id_and_data = addIdColumn(renamed_df, 'wine_id')

display(id_and_data)

Let's assume that the alcohol by volume (ABV) is a variable that changes over time after the wine is opened. The value will be provided as a realtime input in online inference.

Now, split the data into two parts and store only the part with static features to Feature Store.

# wine_id and static features
id_static_features = id_and_data.drop('alcohol', 'quality')

# wine_id, realtime feature (alcohol), label (quality)
id_rt_feature_labels = id_and_data.select('wine_id', 'alcohol', 'quality')

Create a feature table

Next, create a new hive database and save the feature data id_static_features into a feature table.

%sql
create database if not exists online_feature_store_example;
from databricks.feature_store.client import FeatureStoreClient

fs = FeatureStoreClient()
fs.create_table(
    name="online_feature_store_example.wine_static_features",
    primary_keys=["wine_id"],
    df=id_static_features,
    description="id and features of all wine",
)

The feature data has been stored into the feature table. The next step is to set up access to AWS DynamoDB.

Set up DynamoDB Access Key

In this section, you need to take some manual steps to make DynamoDB accessible to this notebook. Databricks creates and updates DynamoDB tables so that DynamoDB can work with Feature Store. The following steps create a new AWS IAM user with the required permissions. You can also choose to use your existing users or roles.

Create an AWS IAM user and download secrets

  1. Go to AWS console http://console.aws.amazon.com, navigate to IAM, and click "Users".
  2. Click "Add users" and create a new user with "Access Key".
  3. Click Next and select policy AmazonDynamoDBFullAccess.
  4. Click Next until the user is created.
  5. Download the "Access key ID" and "Secret access key".

Provide online store credentials using Databricks secrets

Note: For simplicity, the commands below use predefined names for the scope and secrets. To choose your own scope and secret names, follow the process in the Databricks documentation (AWS|Azure).

  1. Create two secret scopes in Databricks.

    databricks secrets create-scope --scope feature-store-example-read
    databricks secrets create-scope --scope feature-store-example-write
    
  2. Create secrets in the scopes.
    Note: the keys should follow the format <prefix>-access-key-id and <prefix>-secret-access-key respectively. Again, for simplicity, these commands use predefined names here. When the commands run, you will be prompted to copy your secrets into an editor.

    databricks secrets put --scope feature-store-example-read --key dynamo-access-key-id
    databricks secrets put --scope feature-store-example-read --key dynamo-secret-access-key
    
    databricks secrets put --scope feature-store-example-write --key dynamo-access-key-id
    databricks secrets put --scope feature-store-example-write --key dynamo-secret-access-key
    

Now the credentials are stored with Databricks Secrets. You will use them below to create the online store.

Publish the features to the online feature store

This allows Feature Store to add a lineage information about the feature table and the online storage. So when the model serves real-time queries, it can lookup features from the online store for better performance.

Note: You must use publish_table() to create the table in the online store. publish_table() creates the DynamoDB table in the online store. If you create a table without using publish_table(), the schema might be incompatible and the write command will fail.

from databricks.feature_store.online_store_spec import AmazonDynamoDBSpec

# Specify the online store.
# Note: these commands use the predefined secret prefix. If you used a different secret scope or prefix, edit these commands before running them.

online_store_spec = AmazonDynamoDBSpec(
  region="us-west-2",
  write_secret_prefix="feature-store-example-write/dynamo",
  read_secret_prefix="feature-store-example-read/dynamo",
  table_name = "feature_store_online_wine_features"
)

# Push the feature table to online store.
fs.publish_table("online_feature_store_example.wine_static_features", online_store_spec)

Train and deploy the model

Now, you will train a classifier using features in the Feature Store. You only need to specify the primary key, and Feature Store will fetch the required features.

from sklearn.ensemble import RandomForestClassifier

import pandas as pd
import logging
import mlflow.sklearn

from databricks.feature_store.entities.feature_lookup import FeatureLookup

First, define a TrainingSet. The training set accepts a feature_lookups list, where each item represents some features from a feature table in the Feature Store. This example uses wine_id as the lookup key to fetch all the features from table online_feature_store_example.wine_features.

training_set = fs.create_training_set(
    id_rt_feature_labels,
    label='quality',
    feature_lookups=[
        FeatureLookup(
            table_name=f"online_feature_store_example.wine_static_features",
            lookup_key="wine_id"
        )
    ],
    exclude_columns=['wine_id'],
)

# Load the training data from Feature Store
training_df = training_set.load_df()

display(training_df)

The next cell trains a RandomForestClassifier model.

X_train = training_df.drop('quality').toPandas()
y_train = training_df.select('quality').toPandas()

# Train model
model = RandomForestClassifier()
model.fit(X_train, y_train.values.ravel())

Save the trained model using log_model. log_model also saves lineage information between the model and the features (through training_set). So, during serving, the model automatically knows where to fetch the features by just the lookup keys.

fs.log_model(
    model,
    artifact_path="model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="wine_quality_classifier"
)

Serve realtime queries with automatic feature lookup

After calling log_model, a new version of the model is saved. To provision a serving endpoint, follow the steps below.

  1. Click Serving under Machine Learning in the left sidebar.
  2. Create a serving endpoint with the model named "wine_quality_classifier". See the Databricks documentation for details (AWS|Azure).

Send a query

In the Serving page, there are three approaches for calling the model. You can try the "Browser" approach with a JSON format request, as shown below. But here we copy-pasted the Python approach to illustrate an programatic way.

# Fill in the Databricks access token value.
# Note: You can generate a new Databricks access token by going to left sidebar "Settings" > "User Settings" > "Access Tokens", or using databricks-cli.

DATABRICKS_TOKEN = "<DATABRICKS_TOKEN>"
assert DATABRICKS_TOKEN.strip() != "<DATABRICKS_TOKEN>"
import os
import requests
import numpy as np
import pandas as pd
import json

def create_tf_serving_json(data):
    return {'inputs': {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

def score_model(dataset):
    url = '<Replace with the URL shown in Serving page>'
    headers = {'Authorization': f'Bearer {DATABRICKS_TOKEN}', 'Content-Type': 'application/json'}
    ds_dict = {'dataframe_split': dataset.to_dict(orient='split')} if isinstance(dataset, pd.DataFrame) else create_tf_serving_json(dataset)
    data_json = json.dumps(ds_dict, allow_nan=True)
    print(data_json)
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
        raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    return response.json()

Now, suppose you opened a bottle of wine and you have a sensor to measure the current ABV from the bottle. Using the model and automated feature lookup with realtime serving, you can predict the quality of the wine using the measured ABV value as the realtime input "alcohol".

new_wine_ids = pd.DataFrame([(25, 7.9), (25, 11.0), (25, 27.9)], columns=['wine_id', "alcohol"])

print(score_model(new_wine_ids))

Notes on request format and API versions

Here is an example of the request format:

{"dataframe_split": {"index": [0, 1, 2], "columns": ["wine_id", "alcohol"], "data": [[25, 7.9], [25, 11.0], [25, 27.9]]}}

Learn more about Databricks Model Serving.

Clean up

Follow this checklist to clean up the resources created by this notebook:

  1. AWS DynamoDB Table
    • Go to AWS console and navigate to DynamoDB.
    • Delete the table feature_store_online_wine_features
  2. AWS user and access key
    • Go to AWS console and navigate to IAM.
    • Search and click on the newly created user.
    • Delete user or click "Make Inactive" on on the Access Key to disable the access.
  3. Secrets store on Databricks Secrets
    databricks secrets delete-scope --scope <scope-name>
  4. Databricks access token
    • From the Databricks left sidebar, "Settings" > "User Settings" > "Access Tokens"