from databricks import feature_store from pyspark.sql.functions import * from pyspark.sql.types import FloatType, IntegerType, StringType from pytz import timezone @udf(returnType=IntegerType()) def is_weekend(dt): tz = "America/New_York" return int(dt.astimezone(timezone(tz)).weekday() >= 5) # 5 = Saturday, 6 = Sunday @udf(returnType=StringType()) def partition_id(dt): # datetime -> "YYYY-MM" return f"{dt.year:04d}-{dt.month:02d}" def filter_df_by_ts(df, ts_column, start_date, end_date): if ts_column and start_date: df = df.filter(col(ts_column) >= start_date) if ts_column and end_date: df = df.filter(col(ts_column) < end_date) return df
def pickup_features_fn(df, ts_column, start_date, end_date): """ Computes the pickup_features feature group. To restrict features to a time range, pass in ts_column, start_date, and/or end_date as kwargs. """ df = filter_df_by_ts( df, ts_column, start_date, end_date ) pickupzip_features = ( df.groupBy( "pickup_zip", window("tpep_pickup_datetime", "1 hour", "15 minutes") ) # 1 hour window, sliding every 15 minutes .agg( mean("fare_amount").alias("mean_fare_window_1h_pickup_zip"), count("*").alias("count_trips_window_1h_pickup_zip"), ) .select( col("pickup_zip").alias("zip"), unix_timestamp(col("window.end")).alias("ts").cast(IntegerType()), partition_id(to_timestamp(col("window.end"))).alias("yyyy_mm"), col("mean_fare_window_1h_pickup_zip").cast(FloatType()), col("count_trips_window_1h_pickup_zip").cast(IntegerType()), ) ) return pickupzip_features def dropoff_features_fn(df, ts_column, start_date, end_date): """ Computes the dropoff_features feature group. To restrict features to a time range, pass in ts_column, start_date, and/or end_date as kwargs. """ df = filter_df_by_ts( df, ts_column, start_date, end_date ) dropoffzip_features = ( df.groupBy("dropoff_zip", window("tpep_dropoff_datetime", "30 minute")) .agg(count("*").alias("count_trips_window_30m_dropoff_zip")) .select( col("dropoff_zip").alias("zip"), unix_timestamp(col("window.end")).alias("ts").cast(IntegerType()), partition_id(to_timestamp(col("window.end"))).alias("yyyy_mm"), col("count_trips_window_30m_dropoff_zip").cast(IntegerType()), is_weekend(col("window.end")).alias("dropoff_is_weekend"), ) ) return dropoffzip_features
from datetime import datetime pickup_features = pickup_features_fn( raw_data, ts_column="tpep_pickup_datetime", start_date=datetime(2016, 1, 1), end_date=datetime(2016, 1, 31) ) dropoff_features = dropoff_features_fn( raw_data, ts_column="tpep_dropoff_datetime", start_date=datetime(2016, 1, 1), end_date=datetime(2016, 1, 31) )
spark.conf.set("spark.sql.shuffle.partitions", "5") fs.create_table( name=pickup_features_table, primary_keys=["zip", "ts"], df=pickup_features, partition_columns="yyyy_mm", description="Taxi Fares. Pickup Features", ) fs.create_table( name=dropoff_features_table, primary_keys=["zip", "ts"], df=dropoff_features, partition_columns="yyyy_mm", description="Taxi Fares. Dropoff Features", )
Out[18]: <FeatureTable: keys=['zip', 'ts'], tags={}>
from pyspark.sql import * from pyspark.sql.functions import current_timestamp from pyspark.sql.types import IntegerType import math from datetime import timedelta import mlflow.pyfunc def rounded_unix_timestamp(dt, num_minutes=15): """ Ceilings datetime dt to interval num_minutes, then returns the unix timestamp. """ nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6 delta = math.ceil(nsecs / (60 * num_minutes)) * (60 * num_minutes) - nsecs return int((dt + timedelta(seconds=delta)).timestamp()) rounded_unix_timestamp_udf = udf(rounded_unix_timestamp, IntegerType()) def rounded_taxi_data(taxi_data_df): # Round the taxi data timestamp to 15 and 30 minute intervals so we can join with the pickup and dropoff features # respectively. taxi_data_df = ( taxi_data_df.withColumn( "rounded_pickup_datetime", rounded_unix_timestamp_udf(taxi_data_df["tpep_pickup_datetime"], lit(15)), ) .withColumn( "rounded_dropoff_datetime", rounded_unix_timestamp_udf(taxi_data_df["tpep_dropoff_datetime"], lit(30)), ) .drop("tpep_pickup_datetime") .drop("tpep_dropoff_datetime") ) taxi_data_df.createOrReplaceTempView("taxi_data") return taxi_data_df taxi_data = rounded_taxi_data(raw_data) display(taxi_data)
Table
Truncated results, showing first 1,000 rows.
import databricks.automl feature_store_lookups = [ { "table_name": pickup_features_table, "lookup_key": ["pickup_zip", "rounded_pickup_datetime"], }, { "table_name": dropoff_features_table, "lookup_key": ["dropoff_zip", "rounded_dropoff_datetime"], } ] summary = databricks.automl.regress(taxi_data, target_col="fare_amount", timeout_minutes=5, feature_store_lookups=feature_store_lookups)
Train an ML model with AutoML and Feature Store feature tables
This notebook expands upon the Feature Store taxi example notebook.
In this notebook, you:
If you have existing feature tables to use, you can skip to the Create an AutoML experiment with feature tables section.
Requirements
Databricks Runtime for Machine Learning 11.3 or above.