# From the data generation notebook
DATA_DBFS_ROOT_DIR = '/tmp/recommender/data'
# You can change these as needed
CACHE_PATH = 'file:///dbfs/tmp/recommender/converter_cache'
CKPT_PATH = '/dbfs/tmp/recommender/model_ckpt'
EXPORT_PATH = '/dbfs/tmp/recommender/model_export'
# Enable mlflow autolog to log the metrics and model
mlflow.tensorflow.autolog()
LABEL_COLUMN = 'label'
NUMERIC_COLUMNS = [
'user_age',
'item_age',
]
CATEGORICAL_COLUMNS = [
'user_id',
'item_id',
'user_topic',
'item_topic',
]
HASH_BUCKET_SIZES = {
'user_id': 400,
'item_id': 2000,
'user_topic': 10,
'item_topic': 10,
}
EMBEDDING_DIMENSIONS = {
'user_id': 8,
'item_id': 16,
'user_topic': 3,
'item_topic': 3,
}
def get_wide_and_deep_columns():
wide_columns, deep_columns = [], []
# embedding columns
for column_name in CATEGORICAL_COLUMNS:
categorical_column = tf.feature_column.categorical_column_with_identity(
column_name, num_buckets=HASH_BUCKET_SIZES[column_name])
wrapped_column = tf.feature_column.embedding_column(
categorical_column,
dimension=EMBEDDING_DIMENSIONS[column_name],
combiner='mean')
wide_columns.append(categorical_column)
deep_columns.append(wrapped_column)
# age columns and cross columns
user_age = tf.feature_column.numeric_column("user_age", shape=(1,), dtype=tf.float32)
item_age = tf.feature_column.numeric_column("item_age", shape=(1,), dtype=tf.float32)
user_age_buckets = tf.feature_column.bucketized_column(user_age, boundaries=[18, 35])
item_age_buckets = tf.feature_column.bucketized_column(item_age, boundaries=[18, 35])
age_crossed = tf.feature_column.crossed_column([user_age_buckets, item_age_buckets], 9)
wide_columns.extend([user_age_buckets, item_age_buckets, age_crossed])
deep_columns.extend([user_age, item_age])
# topic columns and cross columns
user_topic = tf.feature_column.categorical_column_with_identity(
"user_topic", num_buckets=HASH_BUCKET_SIZES["user_topic"])
item_topic = tf.feature_column.categorical_column_with_identity(
"item_topic", num_buckets=HASH_BUCKET_SIZES["item_topic"])
topic_crossed = tf.feature_column.crossed_column([user_topic, item_topic], 30)
wide_columns.append(topic_crossed)
return wide_columns, deep_columns
# Pass optimizer callables instead of optimizer objects to avoid this issue:
# https://stackoverflow.com/questions/58108945/cannot-do-incremental-training-with-dnnregressor
estimator = tf.estimator.DNNLinearCombinedClassifier(
# wide settings
linear_feature_columns=wide_columns,
linear_optimizer=tf.keras.optimizers.Ftrl, # no brackets on purpose
# deep settings
dnn_feature_columns=deep_columns,
dnn_hidden_units=[100, 50],
dnn_optimizer=tf.keras.optimizers.Adagrad, # no brackets on purpose
# warm-start settings
model_dir=CKPT_PATH)
# Adapted from: https://github.com/NVIDIA/DeepLearningExamples/blob/master/TensorFlow/Recommendation/WideAndDeep/utils/metrics.py
def map_custom_metric(features, labels, predictions):
user_ids = tf.reshape(features['user_id'], [-1])
predictions = predictions['probabilities'][:, 1]
# Processing unique user_ids, indexes and counts
# Sorting needed in case the same user_id occurs in two different places
sorted_ids = tf.argsort(user_ids)
user_ids = tf.gather(user_ids, indices=sorted_ids)
predictions = tf.gather(predictions, indices=sorted_ids)
labels = tf.gather(labels, indices=sorted_ids)
_, user_ids_idx, user_ids_items_count = tf.unique_with_counts(
user_ids, out_idx=tf.int64)
pad_length = 30 - tf.reduce_max(user_ids_items_count)
pad_fn = lambda x: tf.pad(x, [(0, 0), (0, pad_length)])
preds = tf.RaggedTensor.from_value_rowids(
predictions, user_ids_idx).to_tensor()
labels = tf.RaggedTensor.from_value_rowids(
labels, user_ids_idx).to_tensor()
labels = tf.argmax(labels, axis=1)
return {
'map': tf.compat.v1.metrics.average_precision_at_k(
predictions=pad_fn(preds),
labels=labels,
k=5,
name="streaming_map")}
# Specify the cache directory
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, CACHE_PATH)
# Create the converters
# Section 4: Train and evaluate the model uses these converters
train_converter = make_spark_converter(train_df_w_features)
val_converter = make_spark_converter(val_df_w_features)
test_converter = make_spark_converter(test_df_w_features)
# You can use the code snippet in this cell on new datasets without modification
def to_tuple(batch):
"""
Utility function that converts the batch from the namedtuple type
to tuple type.
"""
feature = {
"user_id": batch.user_id,
"user_age": batch.user_age,
"user_topic": batch.user_topic,
"item_id": batch.item_id,
"item_age": batch.item_age,
"item_topic": batch.item_topic,
}
if hasattr(batch, "label"):
return feature, batch.label
return feature, None
def get_input_fn(dataset_context_manager):
"""
Utility function that create the input function from the tf dataset returned by the
spark dataset converter.
"""
def fn():
return dataset_context_manager.__enter__().map(to_tuple)
return fn
train_tf_dataset = train_converter.make_tf_dataset()
val_tf_dataset = val_converter.make_tf_dataset()
test_tf_dataset = test_converter.make_tf_dataset()
train_spec = tf.estimator.TrainSpec(input_fn=get_input_fn(train_tf_dataset), max_steps=1250)
eval_spec = tf.estimator.EvalSpec(input_fn=get_input_fn(val_tf_dataset))
with mlflow.start_run():
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
artifact_uri = mlflow.get_artifact_uri()
# The code snippet in this cell can be reused on new datasets with some modifications to match the feature types of your dataset. See https://www.tensorflow.org/tutorials/load_data/tfrecord
def serialize_example(input_pdf):
"""
Serialize a pandas DF with retrieved features to a proto (tf.train.Example).
"""
proto_tensors = []
for i in range(len(input_pdf)):
feature = dict()
for field in input_pdf:
if field in NUMERIC_COLUMNS:
feature[field] = tf.train.Feature(float_list=tf.train.FloatList(value=[input_pdf[field][i]]))
else:
feature[field] = tf.train.Feature(int64_list=tf.train.Int64List(value=[input_pdf[field][i]]))
# Create a Features message using tf.train.Example.
proto = tf.train.Example(features=tf.train.Features(feature=feature))
proto_string = proto.SerializeToString()
proto_tensors.append(tf.constant([proto_string]))
return proto_tensors
def sort_by_group(input_pdf, results_list):
"""
Sort the rows in input_pdf by the predicted scores in results_list.
"""
result_pdf = input_pdf.copy()
result_pdf['probabilities'] = [item['probabilities'][0, 1].numpy() for item in results_list]
return result_pdf \
.sort_values(by='probabilities', ascending=False) \
.groupby("user_id") \
.agg({'item_id': lambda x: x.to_list(), 'probabilities': lambda x: x.to_list()}) \
.reset_index()
# A PythonModel with custom inference logic
class Recommender(mlflow.pyfunc.PythonModel):
def load_context(self, context):
self.model = mlflow.tensorflow.load_model(context.artifacts["model"])
def predict(self, context, model_input):
proto = serialize_example(model_input)
results_list = []
for p in proto:
results_list.append(self.model(p))
return sort_by_group(model_input, results_list)
# This function retrieves the user profile features and item profile features
def retrieve_features(input_pdf):
input_df = spark.createDataFrame(input_pdf.explode('item_id'))
return join_features(input_df).toPandas().astype({'user_age': 'float', 'item_age': 'float'})
print("Retrieving features...")
input_features_pdf = retrieve_features(input_pdf)
input_features_pdf
# Create a Conda environment for the new MLflow Model that contains all necessary dependencies.
# Modify the following entries to match the environment you are using.
conda_env = {
'channels': ['defaults'],
'dependencies': [
f'python={platform.python_version()}',
'pip',
{
'pip': [
'mlflow',
f'tensorflow-cpu=={tf.__version__}',
'tensorflow-estimator',
],
},
],
'name': 'recommender_env'
}
# Log the MLflow pyfunc PythonModel
mlflow_pyfunc_model_path = "recommender_mlflow_pyfunc"
mlflow.pyfunc.log_model(
artifact_path=mlflow_pyfunc_model_path,
python_model=Recommender(),
artifacts=artifacts,
conda_env=conda_env,
input_example=input_features_pdf,
registered_model_name="recommender")
Build and serve a wide-and-deep model in a recommender system
This notebook demonstrates how to build a wide-and-deep model in a recommender system, and how to serve it on Databricks.
Requirements