Build and serve a wide-and-deep model in a recommender system(Python)

Loading...

Build and serve a wide-and-deep model in a recommender system

This notebook demonstrates how to build a wide-and-deep model in a recommender system, and how to serve it on Databricks.

Requirements

  • Databricks Runtime 8.2 ML or above
  • This notebook depends on the output of the data generation notebook (AWS|Azure|GCP).
import mlflow
import pandas as pd
import platform
import tensorflow as tf
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from tensorflow.python.saved_model import tag_constants
# From the data generation notebook
DATA_DBFS_ROOT_DIR = '/tmp/recommender/data'
 
# You can change these as needed 
CACHE_PATH = 'file:///dbfs/tmp/recommender/converter_cache'
CKPT_PATH = '/dbfs/tmp/recommender/model_ckpt'
EXPORT_PATH = '/dbfs/tmp/recommender/model_export'
 
# Enable mlflow autolog to log the metrics and model
mlflow.tensorflow.autolog()

1. Create the model

The output of this step is a tf.estimator.DNNLinearCombinedClassifier estimator object.

1.1 Define the input columns

The get_wide_and_deep_columns() function returns a tuple of (wide_columns, deep_columns) where each element is a list of tf.feature_column.

Specify the name of the label column inLABEL_COLUMN.

To use this reference solution with your own data, you need to implement the get_wide_and_deep_columns() function that returns the correct columns corresponding to your dataset.

LABEL_COLUMN = 'label'
NUMERIC_COLUMNS = [
  'user_age',
  'item_age',
]
CATEGORICAL_COLUMNS = [
  'user_id',
  'item_id',
  'user_topic',
  'item_topic',
]
HASH_BUCKET_SIZES = {
  'user_id': 400,
  'item_id': 2000,
  'user_topic': 10,
  'item_topic': 10,
}
EMBEDDING_DIMENSIONS = {
    'user_id': 8,
    'item_id': 16,
    'user_topic': 3,
    'item_topic': 3,
}
 
def get_wide_and_deep_columns():
  wide_columns, deep_columns = [], []
 
  # embedding columns
  for column_name in CATEGORICAL_COLUMNS:
      categorical_column = tf.feature_column.categorical_column_with_identity(
          column_name, num_buckets=HASH_BUCKET_SIZES[column_name])
      wrapped_column = tf.feature_column.embedding_column(
          categorical_column,
          dimension=EMBEDDING_DIMENSIONS[column_name],
          combiner='mean')
 
      wide_columns.append(categorical_column)
      deep_columns.append(wrapped_column)
  
  # age columns and cross columns
  user_age = tf.feature_column.numeric_column("user_age", shape=(1,), dtype=tf.float32)
  item_age = tf.feature_column.numeric_column("item_age", shape=(1,), dtype=tf.float32)       
  user_age_buckets = tf.feature_column.bucketized_column(user_age, boundaries=[18, 35])
  item_age_buckets = tf.feature_column.bucketized_column(item_age, boundaries=[18, 35])
  age_crossed = tf.feature_column.crossed_column([user_age_buckets, item_age_buckets], 9)
  wide_columns.extend([user_age_buckets, item_age_buckets, age_crossed])
  deep_columns.extend([user_age, item_age])
  
  # topic columns and cross columns
  user_topic = tf.feature_column.categorical_column_with_identity(
          "user_topic", num_buckets=HASH_BUCKET_SIZES["user_topic"])
  item_topic = tf.feature_column.categorical_column_with_identity(
          "item_topic", num_buckets=HASH_BUCKET_SIZES["item_topic"])       
  topic_crossed = tf.feature_column.crossed_column([user_topic, item_topic], 30)
  wide_columns.append(topic_crossed)
  return wide_columns, deep_columns
wide_columns, deep_columns = get_wide_and_deep_columns()
wide_columns
deep_columns

1.2 Define the wide-and-deep model

# Pass optimizer callables instead of optimizer objects to avoid this issue:
# https://stackoverflow.com/questions/58108945/cannot-do-incremental-training-with-dnnregressor
estimator = tf.estimator.DNNLinearCombinedClassifier(
    # wide settings
    linear_feature_columns=wide_columns,
    linear_optimizer=tf.keras.optimizers.Ftrl,  # no brackets on purpose
    # deep settings
    dnn_feature_columns=deep_columns,
    dnn_hidden_units=[100, 50],
    dnn_optimizer=tf.keras.optimizers.Adagrad,  # no brackets on purpose
    # warm-start settings
    model_dir=CKPT_PATH)

2. Create the custom metric

This notebook uses the mean average precision at k as the evaluation metric.

TensorFlow provides a built-in metric tf.compat.v1.metrics.average_precision_at_k. See tf.compat.v1.metrics.average_precision_at_k and this answer in stackoverflow to understand how it works.

# Adapted from: https://github.com/NVIDIA/DeepLearningExamples/blob/master/TensorFlow/Recommendation/WideAndDeep/utils/metrics.py
def map_custom_metric(features, labels, predictions):
  user_ids = tf.reshape(features['user_id'], [-1])
  predictions = predictions['probabilities'][:, 1]
 
  # Processing unique user_ids, indexes and counts
  # Sorting needed in case the same user_id occurs in two different places
  sorted_ids = tf.argsort(user_ids)
  user_ids = tf.gather(user_ids, indices=sorted_ids)
  predictions = tf.gather(predictions, indices=sorted_ids)
  labels = tf.gather(labels, indices=sorted_ids)
 
  _, user_ids_idx, user_ids_items_count = tf.unique_with_counts(
      user_ids, out_idx=tf.int64)
  pad_length = 30 - tf.reduce_max(user_ids_items_count)
  pad_fn = lambda x: tf.pad(x, [(0, 0), (0, pad_length)])
 
  preds = tf.RaggedTensor.from_value_rowids(
      predictions, user_ids_idx).to_tensor()
  labels = tf.RaggedTensor.from_value_rowids(
      labels, user_ids_idx).to_tensor()
 
  labels = tf.argmax(labels, axis=1)
 
  return {
      'map': tf.compat.v1.metrics.average_precision_at_k(
          predictions=pad_fn(preds),
          labels=labels,
          k=5,
          name="streaming_map")}
estimator = tf.estimator.add_metrics(estimator, map_custom_metric)

The 'map' metric in the output of tf.estimator.train_and_evaluate is the map@k result of the model on the evaluation set.

3. Load the data

The outputs of this step are three Spark dataset converters that you can use to create the input_fn, input_fn_eval, and input_fn_test arguments of the estimator.

3.1 Load the training Delta table into a Spark DataFrame

def load_df(name):
  return spark.read.format("delta").load(f"{DATA_DBFS_ROOT_DIR}/{name}")
 
train_df = load_df("user_item_interaction_train")
val_df = load_df('user_item_interaction_val')
test_df = load_df('user_item_interaction_test')

3.2 Expand the training DataFrame with features

item_profile = load_df("item_profile")
user_profile = load_df("user_profile")
def join_features(df):
  return df.join(item_profile, on='item_id', how="left").join(user_profile, on='user_id', how="left")
train_df_w_features = join_features(train_df)
val_df_w_features = join_features(val_df)
test_df_w_features = join_features(test_df)

3.3 Convert the Spark DataFrame into a tf.data.Dataset using SparkDatasetConverter

# Specify the cache directory
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, CACHE_PATH)
 
# Create the converters
# Section 4: Train and evaluate the model uses these converters
train_converter = make_spark_converter(train_df_w_features)    
val_converter = make_spark_converter(val_df_w_features)
test_converter = make_spark_converter(test_df_w_features)

4. Train and evaluate the model

The output of this step is a model saved in the checkpoint directory.

The mlflow.tensorflow.autolog() function logs the metrics of each run. To view these records, click "Experiment" at the upper right.

# You can use the code snippet in this cell on new datasets without modification
def to_tuple(batch):
  """
  Utility function that converts the batch from the namedtuple type
  to tuple type.
  """
  feature = {
    "user_id": batch.user_id,
    "user_age": batch.user_age,
    "user_topic": batch.user_topic,
    "item_id": batch.item_id,
    "item_age": batch.item_age,
    "item_topic": batch.item_topic,
  }
  if hasattr(batch, "label"):
    return feature, batch.label
  return feature, None
 
def get_input_fn(dataset_context_manager):
  """
  Utility function that create the input function from the tf dataset returned by the
  spark dataset converter.
  """
  def fn():
    return dataset_context_manager.__enter__().map(to_tuple)
  return fn
# Uncomment this line to delete the model checkpoints
# dbutils.fs.rm('/tmp/recommender/model_ckpt', recurse=True)
train_tf_dataset = train_converter.make_tf_dataset()
val_tf_dataset = val_converter.make_tf_dataset()
test_tf_dataset = test_converter.make_tf_dataset()
 
train_spec = tf.estimator.TrainSpec(input_fn=get_input_fn(train_tf_dataset), max_steps=1250)
eval_spec = tf.estimator.EvalSpec(input_fn=get_input_fn(val_tf_dataset))
 
with mlflow.start_run():
  tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
  artifact_uri = mlflow.get_artifact_uri()

5. Export the trained model and log the saved model to MLflow

Export the model checkpoint saved in the previous step. MLflow will automatically log the tensorflow model on call to tf.estimator.export_saved_model.

feature_spec = tf.feature_column.make_parse_example_spec(
    wide_columns + deep_columns
)
fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
    feature_spec
)
saved_model_path = estimator.export_saved_model(
    export_dir_base=EXPORT_PATH,
    serving_input_receiver_fn=fn
).decode("utf-8")
artifacts = {
  # Get the path of the model logged in the current active run
  "model": artifact_uri + '/model'
}

6. Build an online recommender

Now you have a wide-and-deep model that predicts the probability of interaction for each (user_id, item_id) pair. However, to build a recommender, you need a model to predict the ranking of items given a user and a list of items.

Specifically, the input needs to be in the format shown in the following cell.

input_pdf = pd.DataFrame(
  {
    "user_id": [1, 2],
    "item_id": [[1, 2, 3], [4, 5, 6]],
  }
)

The desired output is something like:

user_id   ranking   probabilities                 
1         [3, 2, 1]  [0.500410, 0.488329, 0.485111]
2         [5, 4, 6]  [0.501141, 0.497792, 0.484209]
             

You can implement custom inference logic in a subclass of mlflow.pyfunc.PythonModel.

# The code snippet in this cell can be reused on new datasets with some modifications to match the feature types of your dataset. See https://www.tensorflow.org/tutorials/load_data/tfrecord
def serialize_example(input_pdf):
  """
  Serialize a pandas DF with retrieved features to a proto (tf.train.Example).
  """
  proto_tensors = []
  for i in range(len(input_pdf)):
    feature = dict()
    for field in input_pdf:
      if field in NUMERIC_COLUMNS:
        feature[field] = tf.train.Feature(float_list=tf.train.FloatList(value=[input_pdf[field][i]]))
      else: 
        feature[field] = tf.train.Feature(int64_list=tf.train.Int64List(value=[input_pdf[field][i]]))
 
    # Create a Features message using tf.train.Example.
    proto = tf.train.Example(features=tf.train.Features(feature=feature))
    proto_string = proto.SerializeToString()
    proto_tensors.append(tf.constant([proto_string]))
  return proto_tensors
 
def sort_by_group(input_pdf, results_list):
  """
  Sort the rows in input_pdf by the predicted scores in results_list.
  """
  result_pdf = input_pdf.copy()
  result_pdf['probabilities'] = [item['probabilities'][0, 1].numpy() for item in results_list]
  return result_pdf \
    .sort_values(by='probabilities', ascending=False) \
    .groupby("user_id") \
    .agg({'item_id': lambda x: x.to_list(), 'probabilities': lambda x: x.to_list()}) \
    .reset_index()
 
# A PythonModel with custom inference logic
class Recommender(mlflow.pyfunc.PythonModel):
  
  def load_context(self, context):
    self.model = mlflow.tensorflow.load_model(context.artifacts["model"])    
 
  def predict(self, context, model_input):
    proto = serialize_example(model_input)
    results_list = []
    for p in proto:
      results_list.append(self.model(p))
    return sort_by_group(model_input, results_list)

6.1 Register the recommender model

You can use input_features_pdf as an input example and register it with the model.

# This function retrieves the user profile features and item profile features
def retrieve_features(input_pdf):
  input_df = spark.createDataFrame(input_pdf.explode('item_id'))
  return join_features(input_df).toPandas().astype({'user_age': 'float', 'item_age': 'float'})
 
print("Retrieving features...")
input_features_pdf = retrieve_features(input_pdf)
input_features_pdf
# Create a Conda environment for the new MLflow Model that contains all necessary dependencies.
# Modify the following entries to match the environment you are using.
conda_env = {
    'channels': ['defaults'],
    'dependencies': [
      f'python={platform.python_version()}',
      'pip',
      {
        'pip': [
          'mlflow',
          f'tensorflow-cpu=={tf.__version__}',
          'tensorflow-estimator',
        ],
      },
    ],
    'name': 'recommender_env'
}
# Log the MLflow pyfunc PythonModel
mlflow_pyfunc_model_path = "recommender_mlflow_pyfunc"
mlflow.pyfunc.log_model(
  artifact_path=mlflow_pyfunc_model_path, 
  python_model=Recommender(), 
  artifacts=artifacts,
  conda_env=conda_env, 
  input_example=input_features_pdf, 
  registered_model_name="recommender")

6.2 Serve the recommender model

Find the registered model by searching the model list in the "Models" tab on the navigation bar on the bottom left or in the Experiment tab on the top right. Enable model serving for the registered model. For more details, see (AWS|Azure|GCP).

In the Request box on the Serving tab, enter the output of the commands shown in the following cell.

print("Request:")
print(input_features_pdf.to_json(orient='records'))

The expected output, which appears in the Response box on the Serving tab, is:

[{"user_id":1,"item_id":[3,2,1],"probabilities":[0.35572293400764465,0.3401849567890167,0.29400017857551575]},
{"user_id":2,"item_id":[5,6,4],"probabilities":[0.35679787397384644,0.3309633731842041,0.2681720554828644]}]

7. Limitations

This notebook provides some core steps to build a recommender system. It does not cover the following important topics.

Candidate election

When you are interested in recommending a list of items for a given user, candidate election is a separate process that returns a list of items given the user. If the number of all items is not large, you can use the list of all items as the candidate. This notebook assumes you can get the elected candidates from external function calls.

Cold-start problem

The cold-start problem refers to the situation where you want to make recommendations to new users or for new items. This notebook does not include cold-start handling methods.