from mlflow.tracking.client import MlflowClient # Get the user's username def getUsername() -> str: return ( spark .sql("SELECT current_user()") .first()[0] .lower() .split("@")[0] .replace(".", "_")) # Cleanup and set dbfs dir def cleanup_dir(dir_name): dbutils.fs.rm(dir_name, True) def get_latest_model_version(model_name: str): client = MlflowClient() models = client.get_latest_versions(model_name, stages=["None"]) for m in models: new_model_version = m.version return new_model_version
import pyspark.sql.functions as F vacation_purchase_df = spark.read.option("inferSchema", "true").load("/databricks-datasets/travel_recommendations_realtime/raw_travel_data/fs-demo_vacation-purchase_logs/", format="csv", header="true") vacation_purchase_df = vacation_purchase_df.withColumn("booking_date", F.to_date("booking_date")) display(vacation_purchase_df)
import pyspark.sql.functions as F import pyspark.sql.types as T import pyspark.sql.window as w def user_features_fn(vacation_purchase_df): """ Computes the user_features feature group. """ return ( vacation_purchase_df.withColumn( "lookedup_price_7d_rolling_sum", F.sum("price").over( w.Window.partitionBy("user_id") .orderBy(F.col("ts").cast("long")) .rangeBetween(start=-(7 * 86400), end=0) ), ) .withColumn( "lookups_7d_rolling_sum", F.count("*").over( w.Window.partitionBy("user_id") .orderBy(F.col("ts").cast("long")) .rangeBetween(start=-(7 * 86400), end=0) ), ) .withColumn( "mean_price_7d", F.col("lookedup_price_7d_rolling_sum") / F.col("lookups_7d_rolling_sum"), ) .withColumn( "tickets_purchased", F.when(F.col("purchased") == True, F.lit(1)).otherwise(F.lit(0)), ) .withColumn( "last_6m_purchases", F.sum("tickets_purchased").over( w.Window.partitionBy("user_id") .orderBy(F.col("ts").cast("long")) .rangeBetween(start=-(6 * 30 * 86400), end=0) ), ) .withColumn("day_of_week", F.dayofweek("ts")) .select("user_id", "ts", "mean_price_7d", "last_6m_purchases", "day_of_week") ) def destination_features_fn(vacation_purchase_df): """ Computes the destination_features feature group. """ return ( vacation_purchase_df.withColumn( "clicked", F.when(F.col("clicked") == True, 1).otherwise(0) ) .withColumn( "sum_clicks_7d", F.sum("clicked").over( w.Window.partitionBy("destination_id") .orderBy(F.col("ts").cast("long")) .rangeBetween(start=-(7 * 86400), end=0) ), ) .withColumn( "sum_impressions_7d", F.count("*").over( w.Window.partitionBy("destination_id") .orderBy(F.col("ts").cast("long")) .rangeBetween(start=-(7 * 86400), end=0) ), ) .select("destination_id", "ts", "sum_clicks_7d", "sum_impressions_7d") ) return destination_df
from databricks import feature_store fs = feature_store.FeatureStoreClient() # For Databricks Runtime 13.2 for Machine Learning or above: fs.create_table( name="travel_recommendations.user_features", primary_keys=["user_id", "ts"], timestamp_keys="ts", df=user_features_fn(vacation_purchase_df), description="User Features", ) fs.create_table( name="travel_recommendations.popularity_features", primary_keys=["destination_id", "ts"], timestamp_keys="ts", df=destination_features_fn(vacation_purchase_df), description="Destination Popularity Features", ) # For Databricks Runtime 13.1 for Machine Learning or below: # fs.create_table( # name="travel_recommendations.user_features", # primary_keys=["user_id"], # timestamp_keys="ts", # df=user_features_fn(vacation_purchase_df), # description="User Features", # ) # fs.create_table( # name="travel_recommendations.popularity_features", # primary_keys=["destination_id"], # timestamp_keys="ts", # df=destination_features_fn(vacation_purchase_df), # description="Destination Popularity Features", # )
destination_location_df = spark.read.option("inferSchema", "true").load("/databricks-datasets/travel_recommendations_realtime/raw_travel_data/fs-demo_destination-locations/", format="csv", header="true") fs.create_table( name = "travel_recommendations.location_features", primary_keys="destination_id", df = destination_location_df, description = "Destination location features." )
from pyspark.sql.types import IntegerType, DoubleType, TimestampType, DateType, StringType, StructType, StructField from pyspark.sql.functions import col # Setup the delta checkpoint directory fs_destination_availability_features_delta_checkpoint = "/Shared/fs_realtime/checkpoints/destination_availability_features_delta/" cleanup_dir(fs_destination_availability_features_delta_checkpoint) # Create schema destination_availability_schema = StructType([StructField("event_ts", TimestampType(), True), StructField("destination_id", IntegerType(), True), StructField("name", StringType(), True), StructField("booking_date", DateType(), True), StructField("price", DoubleType(), True), StructField("availability", IntegerType(), True), ]) destination_availability_log = spark.readStream.format("delta").option("maxFilesPerTrigger", 1000).option("inferSchema", "true").schema(destination_availability_schema).json("/databricks-datasets/travel_recommendations_realtime/raw_travel_data/fs-demo_destination-availability_logs/json/*") destination_availability_df = destination_availability_log.select( col("event_ts"), col("destination_id"), col("name"), col("booking_date"), col("price"), col("availability") ) display(destination_availability_df)
# For Databricks Runtime 13.2 for Machine Learning or above: fs.create_table( name="travel_recommendations.availability_features", primary_keys=["destination_id", "booking_date", "event_ts"], timestamp_keys=["event_ts"], schema=destination_availability_schema, description="Destination Availability Features", ) # For Databricks Runtime 13.1 for Machine Learning or below: # fs.create_table( # name="travel_recommendations.availability_features", # primary_keys=["destination_id", "booking_date"], # timestamp_keys=["event_ts"], # schema=destination_availability_schema, # description="Destination Availability Features", # ) # Now write the data to the feature table in "merge" mode fs.write_table( name="travel_recommendations.availability_features", df=destination_availability_df, mode="merge", checkpoint_location=fs_destination_availability_features_delta_checkpoint )
import geopy import mlflow import logging import lightgbm as lgb import pandas as pd import geopy.distance as geopy_distance from typing import Tuple # Define the model class with on-demand computation model wrapper class OnDemandComputationModelWrapper(mlflow.pyfunc.PythonModel): def fit(self, X_train: pd.DataFrame, y_train: pd.DataFrame): try: new_model_input = self._compute_ondemand_features(X_train) self.model = lgb.train( {"num_leaves": 32, "objective": "binary"}, lgb.Dataset(new_model_input, label=y_train.values), 5) except Exception as e: logging.error(e) def _distance( self, lon_lat_user: Tuple[float, float], lon_lat_destination: Tuple[float, float], ) -> float: """ Wrapper call to calculate pair distance in miles ::lon_lat_user (longitude, latitude) tuple of user location ::lon_lat_destination (longitude, latitude) tuple of destination location """ return geopy_distance.distance( geopy_distance.lonlat(*lon_lat_user), geopy_distance.lonlat(*lon_lat_destination), ).miles def _compute_ondemand_features(self, model_input: pd.DataFrame)->pd.DataFrame: try: # Fill NAs first loc_cols = ["user_longitude","user_latitude","longitude","latitude"] location_noNAs_pdf = model_input[loc_cols].fillna(model_input[loc_cols].median().to_dict()) # Calculate distances model_input["distance"] = location_noNAs_pdf.apply(lambda x: self._distance((x[0], x[1]), (x[2], x[3])), axis=1) # Drop columns model_input.drop(columns=loc_cols) except Exception as e: logging.error(e) raise e return model_input def predict(self, context, model_input): new_model_input = self._compute_ondemand_features(model_input) return self.model.predict(new_model_input)
from databricks.feature_store.client import FeatureStoreClient from databricks.feature_store.entities.feature_lookup import FeatureLookup fs = FeatureStoreClient() feature_lookups = [ FeatureLookup( table_name=f"travel_recommendations.popularity_features", lookup_key="destination_id", timestamp_lookup_key="ts" ), FeatureLookup( table_name=f"travel_recommendations.location_features", lookup_key="destination_id", feature_names=["latitude", "longitude"] ), FeatureLookup( table_name=f"travel_recommendations.user_features", lookup_key="user_id", timestamp_lookup_key="ts", feature_names=["mean_price_7d"] ), FeatureLookup( table_name=f"travel_recommendations.availability_features", lookup_key=["destination_id", "booking_date"], timestamp_lookup_key="ts", feature_names=["availability"] ) ]
from sklearn.model_selection import train_test_split with mlflow.start_run(): # Split features and labels features_and_label = training_df.columns # Collect data into a Pandas array for training and testing data = training_df.toPandas()[features_and_label] train, test = train_test_split(data, random_state=123) X_train = train.drop(["purchased"], axis=1) X_test = test.drop(["purchased"], axis=1) y_train = train.purchased y_test = test.purchased # Fit pyfunc_model = OnDemandComputationModelWrapper() pyfunc_model.fit(X_train, y_train) # Log custom model to MLflow model_name = "realtime_destination_recommendations" fs.log_model( artifact_path="model", model=pyfunc_model, flavor = mlflow.pyfunc, training_set=training_set, registered_model_name=model_name, conda_env=get_conda_env() )
from pyspark.sql import functions as F scored_df2 = scored_df.withColumnRenamed("prediction", "original_prediction") scored_df2 = scored_df2.withColumn("prediction", (F.when(F.col("original_prediction") >= 0.2, True).otherwise(False))) # simply convert the original probability predictions to true or false pd_scoring = scored_df2.select("purchased", "prediction").toPandas() from sklearn.metrics import accuracy_score print("Accuracy: ", accuracy_score(pd_scoring["purchased"], pd_scoring["prediction"]))
from databricks.feature_store import FeatureStoreClient from databricks.feature_store.online_store_spec import AmazonDynamoDBSpec fs = FeatureStoreClient() destination_location_online_store_spec = AmazonDynamoDBSpec( region="us-west-2", write_secret_prefix="feature-store-example-write/dynamo", read_secret_prefix="feature-store-example-read/dynamo", table_name = "feature_store_travel_recommendations_location_features" ) destination_online_store_spec = AmazonDynamoDBSpec( region="us-west-2", write_secret_prefix="feature-store-example-write/dynamo", read_secret_prefix="feature-store-example-read/dynamo", table_name = "feature_store_travel_recommendations_popularity_features" ) destination_availability_online_store_spec = AmazonDynamoDBSpec( region="us-west-2", write_secret_prefix="feature-store-example-write/dynamo", read_secret_prefix="feature-store-example-read/dynamo", table_name = "feature_store_travel_recommendations_availability_features" ) user_online_store_spec = AmazonDynamoDBSpec( region="us-west-2", write_secret_prefix="feature-store-example-write/dynamo", read_secret_prefix="feature-store-example-read/dynamo", table_name = "feature_store_travel_recommendations_user_features" ) # Setup the delta checkpoint directory fs_destination_availability_features_online_checkpoint = "/Shared/fs_realtime/checkpoints/destination_availability_features_online/" cleanup_dir(fs_destination_availability_features_online_checkpoint)
fs.publish_table(f"travel_recommendations.user_features", user_online_store_spec) fs.publish_table(f"travel_recommendations.location_features", destination_location_online_store_spec) fs.publish_table(f"travel_recommendations.popularity_features", destination_online_store_spec) # Push features to Online Store through Spark Structured streaming fs.publish_table(f"travel_recommendations.availability_features", destination_availability_online_store_spec, streaming = True, checkpoint_location=fs_destination_availability_features_online_checkpoint)
# Provide both a token for the API, which can be obtained from the notebook. token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None) # With the token, we can create our authorization header for our subsequent REST calls headers = {"Authorization": f"Bearer {token}"} # Next we need an enpoint at which to execute our request which we can get from the Notebook's tags collection java_tags = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags() # This object comes from the Java CM - Convert the Java Map opject to a Python dictionary tags = sc._jvm.scala.collection.JavaConversions.mapAsJavaMap(java_tags) # Lastly, extract the databricks instance (domain name) from the dictionary instance = tags["browserHostName"]
import requests def score_model(data_json: dict): url = f"https://{instance}/model/{model_name}/{get_latest_model_version(model_name)}/invocations" response = requests.request(method="POST", headers=headers, url=url, json=data_json) if response.status_code != 200: raise Exception(f"Request failed with status {response.status_code}, {response.text}") return response.json()
payload_json = { "dataframe_records": [ # Users in New York, see high scores for Florida {"user_id": 4, "booking_date": "2022-12-22", "destination_id": 16, "user_latitude": 40.71277, "user_longitude": -74.005974}, # Users in California, see high scores for Hawaii {"user_id": 39, "booking_date": "2022-12-22", "destination_id": 1, "user_latitude": 37.77493, "user_longitude": -122.41942} ] }
Travel recommendation example notebook
This notebook illustrates the use of different feature computation modes: batch, streaming and on-demand. It has been shown that machine learning models degrade in performance as the features become stale. This is true more so for certain type of features than others. If the data being generated updates quickly and factors heavily into the outcome of the model, it should be updated regularly. However, updating static data often would lead to increased costs with no perceived benefits. This notebook illustrates various feature computation modes available in Databricks using Databricks Feature Store based on the feature freshness requirements for a travel recommendation website.
This notebook builds a ranking model to predict likelihood of a user purchasing a destination package.
The notebook is structured as follows:
Requirements
Note: Starting with Databricks Runtime 13.2 ML, a change was made to the
create_table
API. Timestamp key columns must now be specified in theprimary_keys
argument. If you are using this notebook with Databricks Runtime 13.1 ML or below, use the commented-out code for thecreate_table
call in Cmd 12.