Build and serve a wide-and-deep model in a recommender system(Python)

Loading...

Build and serve a wide-and-deep model in a recommender system

This notebook demonstrates how to build a wide-and-deep model in a recommender system, and how to serve it on Databricks.

Requirements

  • Databricks Runtime 8.2 ML or above
  • This notebook depends on the output of the data generation notebook (AWS|Azure|GCP).
import mlflow
import pandas as pd
import platform
import tensorflow as tf
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from tensorflow.python.saved_model import tag_constants
# From the data generation notebook
DATA_DBFS_ROOT_DIR = '/tmp/recommender/data'
 
# You can change these as needed 
CACHE_PATH = 'file:///dbfs/tmp/recommender/converter_cache'
CKPT_PATH = '/dbfs/tmp/recommender/model_ckpt'
EXPORT_PATH = '/dbfs/tmp/recommender/model_export'
 
# Enable mlflow autolog to log the metrics and model
mlflow.tensorflow.autolog()

1. Create the model

The output of this step is a tf.estimator.DNNLinearCombinedClassifier estimator object.

1.1 Define the input columns

The get_wide_and_deep_columns() function returns a tuple of (wide_columns, deep_columns) where each element is a list of tf.feature_column.

Specify the name of the label column inLABEL_COLUMN.

To use this reference solution with your own data, you need to implement the get_wide_and_deep_columns() function that returns the correct columns corresponding to your dataset.

LABEL_COLUMN = 'label'
NUMERIC_COLUMNS = [
  'user_age',
  'item_age',
]
CATEGORICAL_COLUMNS = [
  'user_id',
  'item_id',
  'user_topic',
  'item_topic',
]
HASH_BUCKET_SIZES = {
  'user_id': 400,
  'item_id': 2000,
  'user_topic': 10,
  'item_topic': 10,
}
EMBEDDING_DIMENSIONS = {
    'user_id': 8,
    'item_id': 16,
    'user_topic': 3,
    'item_topic': 3,
}
 
def get_wide_and_deep_columns():
  wide_columns, deep_columns = [], []
 
  # embedding columns
  for column_name in CATEGORICAL_COLUMNS:
      categorical_column = tf.feature_column.categorical_column_with_identity(
          column_name, num_buckets=HASH_BUCKET_SIZES[column_name])
      wrapped_column = tf.feature_column.embedding_column(
          categorical_column,
          dimension=EMBEDDING_DIMENSIONS[column_name],
          combiner='mean')
 
      wide_columns.append(categorical_column)
      deep_columns.append(wrapped_column)
  
  # age columns and cross columns
  user_age = tf.feature_column.numeric_column("user_age", shape=(1,), dtype=tf.float32)
  item_age = tf.feature_column.numeric_column("item_age", shape=(1,), dtype=tf.float32)       
  user_age_buckets = tf.feature_column.bucketized_column(user_age, boundaries=[18, 35])
  item_age_buckets = tf.feature_column.bucketized_column(item_age, boundaries=[18, 35])
  age_crossed = tf.feature_column.crossed_column([user_age_buckets, item_age_buckets], 9)
  wide_columns.extend([user_age_buckets, item_age_buckets, age_crossed])
  deep_columns.extend([user_age, item_age])
  
  # topic columns and cross columns
  user_topic = tf.feature_column.categorical_column_with_identity(
          "user_topic", num_buckets=HASH_BUCKET_SIZES["user_topic"])
  item_topic = tf.feature_column.categorical_column_with_identity(
          "item_topic", num_buckets=HASH_BUCKET_SIZES["item_topic"])       
  topic_crossed = tf.feature_column.crossed_column([user_topic, item_topic], 30)
  wide_columns.append(topic_crossed)
  return wide_columns, deep_columns
wide_columns, deep_columns = get_wide_and_deep_columns()
wide_columns
deep_columns

1.2 Define the wide-and-deep model

# Pass optimizer callables instead of optimizer objects to avoid this issue:
# https://stackoverflow.com/questions/58108945/cannot-do-incremental-training-with-dnnregressor
estimator = tf.estimator.DNNLinearCombinedClassifier(
    # wide settings
    linear_feature_columns=wide_columns,
    linear_optimizer=tf.keras.optimizers.Ftrl,  # no brackets on purpose
    # deep settings
    dnn_feature_columns=deep_columns,
    dnn_hidden_units=[100, 50],
    dnn_optimizer=tf.keras.optimizers.Adagrad,  # no brackets on purpose
    # warm-start settings
    model_dir=CKPT_PATH)

2. Create the custom metric

This notebook uses the mean average precision at k as the evaluation metric.

TensorFlow provides a built-in metric tf.compat.v1.metrics.average_precision_at_k. See tf.compat.v1.metrics.average_precision_at_k and this answer in stackoverflow to understand how it works.

# Adapted from: https://github.com/NVIDIA/DeepLearningExamples/blob/master/TensorFlow/Recommendation/WideAndDeep/utils/metrics.py
def map_custom_metric(features, labels, predictions):
  user_ids = tf.reshape(features['user_id'], [-1])
  predictions = predictions['probabilities'][:, 1]
 
  # Processing unique user_ids, indexes and counts
  # Sorting needed in case the same user_id occurs in two different places
  sorted_ids = tf.argsort(user_ids)
  user_ids = tf.gather(user_ids, indices=sorted_ids)
  predictions = tf.gather(predictions, indices=sorted_ids)
  labels = tf.gather(labels, indices=sorted_ids)
 
  _, user_ids_idx, user_ids_items_count = tf.unique_with_counts(
      user_ids, out_idx=tf.int64)
  pad_length = 30 - tf.reduce_max(user_ids_items_count)
  pad_fn = lambda x: tf.pad(x, [(0, 0), (0, pad_length)])
 
  preds = tf.RaggedTensor.from_value_rowids(
      predictions, user_ids_idx).to_tensor()
  labels = tf.RaggedTensor.from_value_rowids(
      labels, user_ids_idx).to_tensor()
 
  labels = tf.argmax(labels, axis=1)
 
  return {
      'map': tf.compat.v1.metrics.average_precision_at_k(
          predictions=pad_fn(preds),
          labels=labels,
          k=5,
          name="streaming_map")}
estimator = tf.estimator.add_metrics(estimator, map_custom_metric)

The 'map' metric in the output of tf.estimator.train_and_evaluate is the map@k result of the model on the evaluation set.

3. Load the data

The outputs of this step are three Spark dataset converters that you can use to create the input_fn, input_fn_eval, and input_fn_test arguments of the estimator.

3.1 Load the training Delta table into a Spark DataFrame

def load_df(name):
  return spark.read.format("delta").load(f"{DATA_DBFS_ROOT_DIR}/{name}")
 
train_df = load_df("user_item_interaction_train")
val_df = load_df('user_item_interaction_val')
test_df = load_df('user_item_interaction_test')

3.2 Expand the training DataFrame with features

item_profile = load_df("item_profile")
user_profile = load_df("user_profile")
def join_features(df):
  return df.join(item_profile, on='item_id', how="left").join(user_profile, on='user_id', how="left")
train_df_w_features = join_features(train_df)
val_df_w_features = join_features(val_df)
test_df_w_features = join_features(test_df)

3.3 Convert the Spark DataFrame into a tf.data.Dataset using SparkDatasetConverter

# Specify the cache directory
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, CACHE_PATH)
 
# Create the converters
# Section 4: Train and evaluate the model uses these converters
train_converter = make_spark_converter(train_df_w_features)    
val_converter = make_spark_converter(val_df_w_features)
test_converter = make_spark_converter(test_df_w_features)

4. Train and evaluate the model

The output of this step is a model saved in the checkpoint directory.

The mlflow.tensorflow.autolog() function logs the metrics of each run. To view these records, click "Experiment" at the upper right.