AutoML experiment with Feature Store example(Python)

Loading...

Train an ML model with AutoML and Feature Store feature tables

This notebook expands upon the Feature Store taxi example notebook.

In this notebook, you:

  • Create new feature tables in Feature Store
  • Use feature tables in Feature Store in an AutoML experiment

If you have existing feature tables to use, you can skip to the Create an AutoML experiment with feature tables section.

Requirements

Databricks Runtime for Machine Learning 11.3 or above.

Load data

This was generated from the full NYC Taxi Data.

# Load the `nyc-taxi-tiny` dataset.  
raw_data = spark.read.format("delta").load("/databricks-datasets/nyctaxi-with-zipcodes/subsampled")
display(raw_data)
 
tpep_pickup_datetime
tpep_dropoff_datetime
trip_distance
fare_amount
pickup_zip
dropoff_zip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2016-02-14T16:52:13.000+0000
2016-02-14T17:16:04.000+0000
4.94
19
10282
10171
2016-02-04T18:44:19.000+0000
2016-02-04T18:46:00.000+0000
0.28
3.5
10110
10110
2016-02-17T17:13:57.000+0000
2016-02-17T17:17:55.000+0000
0.7
5
10103
10023
2016-02-18T10:36:07.000+0000
2016-02-18T10:41:45.000+0000
0.8
6
10022
10017
2016-02-22T14:14:41.000+0000
2016-02-22T14:31:52.000+0000
4.51
17
10110
10282
2016-02-05T06:45:02.000+0000
2016-02-05T06:50:26.000+0000
1.8
7
10009
10065
2016-02-15T15:03:28.000+0000
2016-02-15T15:18:45.000+0000
2.58
12
10153
10199
2016-02-25T19:09:26.000+0000
2016-02-25T19:24:50.000+0000
1.4
11
10112
10069
2016-02-13T16:28:18.000+0000
2016-02-13T16:36:36.000+0000
1.21
7.5
10023
10153
2016-02-14T00:03:48.000+0000
2016-02-14T00:10:24.000+0000
0.6
6
10012
10003
2016-02-27T15:02:58.000+0000
2016-02-27T15:08:31.000+0000
2.02
8
10002
11211
2016-02-17T07:52:40.000+0000
2016-02-17T08:01:21.000+0000
1.5
8
10019
10199
2016-02-14T21:55:55.000+0000
2016-02-14T22:01:31.000+0000
0.93
6
10019
10018
2016-02-05T22:27:07.000+0000
2016-02-05T22:39:44.000+0000
2.34
10.5
10110
10014
2016-02-05T09:51:47.000+0000
2016-02-05T09:57:27.000+0000
0.91
5.5
10119
10199
2016-02-21T11:15:39.000+0000
2016-02-21T11:40:24.000+0000
11.6
33.5
10019
11371
2016-02-23T13:20:29.000+0000
2016-02-23T13:36:25.000+0000
1.4
11
10018
10022
Truncated results, showing first 1,000 rows.

Compute features

from databricks import feature_store
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, IntegerType, StringType
from pytz import timezone


@udf(returnType=IntegerType())
def is_weekend(dt):
    tz = "America/New_York"
    return int(dt.astimezone(timezone(tz)).weekday() >= 5)  # 5 = Saturday, 6 = Sunday
  
@udf(returnType=StringType())  
def partition_id(dt):
    # datetime -> "YYYY-MM"
    return f"{dt.year:04d}-{dt.month:02d}"


def filter_df_by_ts(df, ts_column, start_date, end_date):
    if ts_column and start_date:
        df = df.filter(col(ts_column) >= start_date)
    if ts_column and end_date:
        df = df.filter(col(ts_column) < end_date)
    return df
def pickup_features_fn(df, ts_column, start_date, end_date):
    """
    Computes the pickup_features feature group.
    To restrict features to a time range, pass in ts_column, start_date, and/or end_date as kwargs.
    """
    df = filter_df_by_ts(
        df, ts_column, start_date, end_date
    )
    pickupzip_features = (
        df.groupBy(
            "pickup_zip", window("tpep_pickup_datetime", "1 hour", "15 minutes")
        )  # 1 hour window, sliding every 15 minutes
        .agg(
            mean("fare_amount").alias("mean_fare_window_1h_pickup_zip"),
            count("*").alias("count_trips_window_1h_pickup_zip"),
        )
        .select(
            col("pickup_zip").alias("zip"),
            unix_timestamp(col("window.end")).alias("ts").cast(IntegerType()),
            partition_id(to_timestamp(col("window.end"))).alias("yyyy_mm"),
            col("mean_fare_window_1h_pickup_zip").cast(FloatType()),
            col("count_trips_window_1h_pickup_zip").cast(IntegerType()),
        )
    )
    return pickupzip_features
  
def dropoff_features_fn(df, ts_column, start_date, end_date):
    """
    Computes the dropoff_features feature group.
    To restrict features to a time range, pass in ts_column, start_date, and/or end_date as kwargs.
    """
    df = filter_df_by_ts(
        df,  ts_column, start_date, end_date
    )
    dropoffzip_features = (
        df.groupBy("dropoff_zip", window("tpep_dropoff_datetime", "30 minute"))
        .agg(count("*").alias("count_trips_window_30m_dropoff_zip"))
        .select(
            col("dropoff_zip").alias("zip"),
            unix_timestamp(col("window.end")).alias("ts").cast(IntegerType()),
            partition_id(to_timestamp(col("window.end"))).alias("yyyy_mm"),
            col("count_trips_window_30m_dropoff_zip").cast(IntegerType()),
            is_weekend(col("window.end")).alias("dropoff_is_weekend"),
        )
    )
    return dropoffzip_features  
from datetime import datetime

pickup_features = pickup_features_fn(
    raw_data, ts_column="tpep_pickup_datetime", start_date=datetime(2016, 1, 1), end_date=datetime(2016, 1, 31)
)
dropoff_features = dropoff_features_fn(
    raw_data, ts_column="tpep_dropoff_datetime", start_date=datetime(2016, 1, 1), end_date=datetime(2016, 1, 31)
)
display(pickup_features)
 
zip
ts
yyyy_mm
mean_fare_window_1h_pickup_zip
count_trips_window_1h_pickup_zip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
10153
1452474000
2016-01
8.5
1
10018
1453941000
2016-01
13.25
2
10018
1453717800
2016-01
32.5
2
10021
1454161500
2016-01
5.75
2
10011
1452689100
2016-01
26
2
11206
1452314700
2016-01
3
1
10020
1453147200
2016-01
7.125
4
10022
1452454200
2016-01
9.5
2
10119
1453273200
2016-01
6.5
1
10028
1452465900
2016-01
14.5
3
10112
1453058100
2016-01
11
1
10011
1451655000
2016-01
36.25
2
10065
1454174100
2016-01
7.25
2
10119
1452979800
2016-01
5.5
1
10103
1452901500
2016-01
10
1
10110
1452716100
2016-01
7.1666665
3
10013
1453061700
2016-01
13.5
1
Truncated results, showing first 1,000 rows.

Use Feature Store library to create new feature tables

%sql 
CREATE DATABASE IF NOT EXISTS feature_store_taxi_example;
OK
fs = feature_store.FeatureStoreClient()
import uuid
feature_database = "feature_store_taxi_example"
random_id = str(uuid.uuid4())[:8]
pickup_features_table = f"{feature_database}.trip_pickup_features_{random_id}"
dropoff_features_table = f"{feature_database}.trip_dropoff_features_{random_id}"
spark.conf.set("spark.sql.shuffle.partitions", "5")

fs.create_table(
    name=pickup_features_table,
    primary_keys=["zip", "ts"],
    df=pickup_features,
    partition_columns="yyyy_mm",
    description="Taxi Fares. Pickup Features",
)
fs.create_table(
    name=dropoff_features_table,
    primary_keys=["zip", "ts"],
    df=dropoff_features,
    partition_columns="yyyy_mm",
    description="Taxi Fares. Dropoff Features",
)
Out[18]: <FeatureTable: keys=['zip', 'ts'], tags={}>

Create AutoML experiment with feature store tables

from pyspark.sql import *
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import IntegerType
import math
from datetime import timedelta
import mlflow.pyfunc

def rounded_unix_timestamp(dt, num_minutes=15):
    """
    Ceilings datetime dt to interval num_minutes, then returns the unix timestamp.
    """
    nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6
    delta = math.ceil(nsecs / (60 * num_minutes)) * (60 * num_minutes) - nsecs
    return int((dt + timedelta(seconds=delta)).timestamp())

rounded_unix_timestamp_udf = udf(rounded_unix_timestamp, IntegerType())

def rounded_taxi_data(taxi_data_df):
    # Round the taxi data timestamp to 15 and 30 minute intervals so we can join with the pickup and dropoff features
    # respectively.
    taxi_data_df = (
        taxi_data_df.withColumn(
            "rounded_pickup_datetime",
            rounded_unix_timestamp_udf(taxi_data_df["tpep_pickup_datetime"], lit(15)),
        )
        .withColumn(
            "rounded_dropoff_datetime",
            rounded_unix_timestamp_udf(taxi_data_df["tpep_dropoff_datetime"], lit(30)),
        )
        .drop("tpep_pickup_datetime")
        .drop("tpep_dropoff_datetime")
    )
    taxi_data_df.createOrReplaceTempView("taxi_data")
    return taxi_data_df
  
taxi_data = rounded_taxi_data(raw_data)
display(taxi_data)
 
trip_distance
fare_amount
pickup_zip
dropoff_zip
rounded_pickup_datetime
rounded_dropoff_datetime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
4.94
19
10282
10171
1455469200
1455471000
0.28
3.5
10110
10110
1454611500
1454612400
0.7
5
10103
10023
1455729300
1455730200
0.8
6
10022
10017
1455792300
1455793200
4.51
17
10110
10282
1456150500
1456153200
1.8
7
10009
10065
1454655600
1454655600
2.58
12
10153
10199
1455549300
1455550200
1.4
11
10112
10069
1456427700
1456428600
1.21
7.5
10023
10153
1455381000
1455382800
0.6
6
10012
10003
1455408900
1455409800
2.02
8
10002
11211
1456586100
1456587000
1.5
8
10019
10199
1455696000
1455697800
0.93
6
10019
10018
1455487200
1455489000
2.34
10.5
10110
10014
1454711400
1454713200
0.91
5.5
10119
10199
1454666400
1454666400
11.6
33.5
10019
11371
1456054200
1456056000
1.4
11
10018
10022
1456234200
1456236000
Truncated results, showing first 1,000 rows.
import databricks.automl

feature_store_lookups = [
  {
     "table_name": pickup_features_table,
     "lookup_key": ["pickup_zip", "rounded_pickup_datetime"],
  },
  {
     "table_name": dropoff_features_table,
     "lookup_key": ["dropoff_zip", "rounded_dropoff_datetime"],
  }
]

summary = databricks.automl.regress(taxi_data, 
                                    target_col="fare_amount", 
                                    timeout_minutes=5, 
                                    feature_store_lookups=feature_store_lookups)