# From the data generation notebook
DATA_DBFS_ROOT_DIR = '/tmp/recommender/data'
# You can change these as needed
CACHE_PATH = 'file:///dbfs/tmp/recommender/converter_cache'
CKPT_PATH = '/dbfs/tmp/recommender/model_ckpt'
EXPORT_PATH = '/dbfs/tmp/recommender/model_export'
# Enable mlflow autolog to log the metrics and model
mlflow.tensorflow.autolog()
LABEL_COLUMN = 'label'
NUMERIC_COLUMNS = [
'user_age',
'item_age',
]
CATEGORICAL_COLUMNS = [
'user_id',
'item_id',
'user_topic',
'item_topic',
]
HASH_BUCKET_SIZES = {
'user_id': 400,
'item_id': 2000,
'user_topic': 10,
'item_topic': 10,
}
EMBEDDING_DIMENSIONS = {
'user_id': 8,
'item_id': 16,
'user_topic': 3,
'item_topic': 3,
}
def get_wide_and_deep_columns():
wide_columns, deep_columns = [], []
# embedding columns
for column_name in CATEGORICAL_COLUMNS:
categorical_column = tf.feature_column.categorical_column_with_identity(
column_name, num_buckets=HASH_BUCKET_SIZES[column_name])
wrapped_column = tf.feature_column.embedding_column(
categorical_column,
dimension=EMBEDDING_DIMENSIONS[column_name],
combiner='mean')
wide_columns.append(categorical_column)
deep_columns.append(wrapped_column)
# age columns and cross columns
user_age = tf.feature_column.numeric_column("user_age", shape=(1,), dtype=tf.float32)
item_age = tf.feature_column.numeric_column("item_age", shape=(1,), dtype=tf.float32)
user_age_buckets = tf.feature_column.bucketized_column(user_age, boundaries=[18, 35])
item_age_buckets = tf.feature_column.bucketized_column(item_age, boundaries=[18, 35])
age_crossed = tf.feature_column.crossed_column([user_age_buckets, item_age_buckets], 9)
wide_columns.extend([user_age_buckets, item_age_buckets, age_crossed])
deep_columns.extend([user_age, item_age])
# topic columns and cross columns
user_topic = tf.feature_column.categorical_column_with_identity(
"user_topic", num_buckets=HASH_BUCKET_SIZES["user_topic"])
item_topic = tf.feature_column.categorical_column_with_identity(
"item_topic", num_buckets=HASH_BUCKET_SIZES["item_topic"])
topic_crossed = tf.feature_column.crossed_column([user_topic, item_topic], 30)
wide_columns.append(topic_crossed)
return wide_columns, deep_columns
# Pass optimizer callables instead of optimizer objects to avoid this issue:
# https://stackoverflow.com/questions/58108945/cannot-do-incremental-training-with-dnnregressor
estimator = tf.estimator.DNNLinearCombinedClassifier(
# wide settings
linear_feature_columns=wide_columns,
linear_optimizer=tf.keras.optimizers.Ftrl, # no brackets on purpose
# deep settings
dnn_feature_columns=deep_columns,
dnn_hidden_units=[100, 50],
dnn_optimizer=tf.keras.optimizers.Adagrad, # no brackets on purpose
# warm-start settings
model_dir=CKPT_PATH)
# Adapted from: https://github.com/NVIDIA/DeepLearningExamples/blob/master/TensorFlow/Recommendation/WideAndDeep/utils/metrics.py
def map_custom_metric(features, labels, predictions):
user_ids = tf.reshape(features['user_id'], [-1])
predictions = predictions['probabilities'][:, 1]
# Processing unique user_ids, indexes and counts
# Sorting needed in case the same user_id occurs in two different places
sorted_ids = tf.argsort(user_ids)
user_ids = tf.gather(user_ids, indices=sorted_ids)
predictions = tf.gather(predictions, indices=sorted_ids)
labels = tf.gather(labels, indices=sorted_ids)
_, user_ids_idx, user_ids_items_count = tf.unique_with_counts(
user_ids, out_idx=tf.int64)
pad_length = 30 - tf.reduce_max(user_ids_items_count)
pad_fn = lambda x: tf.pad(x, [(0, 0), (0, pad_length)])
preds = tf.RaggedTensor.from_value_rowids(
predictions, user_ids_idx).to_tensor()
labels = tf.RaggedTensor.from_value_rowids(
labels, user_ids_idx).to_tensor()
labels = tf.argmax(labels, axis=1)
return {
'map': tf.compat.v1.metrics.average_precision_at_k(
predictions=pad_fn(preds),
labels=labels,
k=5,
name="streaming_map")}
# Specify the cache directory
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, CACHE_PATH)
# Create the converters
# Section 4: Train and evaluate the model uses these converters
train_converter = make_spark_converter(train_df_w_features)
val_converter = make_spark_converter(val_df_w_features)
test_converter = make_spark_converter(test_df_w_features)
Build and serve a wide-and-deep model in a recommender system
This notebook demonstrates how to build a wide-and-deep model in a recommender system, and how to serve it on Databricks.
Requirements