from uszipcode import SearchEngine
import sqlite3
import pandas as pd
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
import math
from urllib import request
import os
BAD_ZIPCODE_VALUE = 'bad_zipcode'
file_location = "dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/"
file_type = "csv"
target_year = 2016
def push_zipcode_data_to_executors():
target_dir = '/tmp/db/'
target_file = os.path.join(target_dir, 'simple_db.sqlite')
remote_url = 'https://github.com/MacHu-GWU/uszipcode-project/files/5183256/simple_db.log'
os.makedirs(target_dir, exist_ok=True)
request.urlretrieve(remote_url, target_file)
search = SearchEngine(db_file_dir=target_dir)
conn = sqlite3.connect(target_file)
pdf = pd.read_sql_query('''select zipcode, lat, lng, radius_in_miles,
bounds_west, bounds_east, bounds_north, bounds_south from
simple_zipcode''',conn)
return sc.broadcast(pdf)
@udf('string')
def get_zipcode(lat, lng):
if lat is None or lng is None:
return BAD_ZIPCODE_VALUE
dist_btwn_lat_deg = 69.172
dist_btwn_lon_deg = math.cos(lat) * 69.172
radius = 5
lat_degr_rad = abs(radius / dist_btwn_lat_deg)
lon_degr_rad = abs(radius / dist_btwn_lon_deg)
lat_lower = lat - lat_degr_rad
lat_upper = lat + lat_degr_rad
lng_lower = lng - lon_degr_rad
lng_upper = lng + lon_degr_rad
pdf = zipcodes_broadcast_df.value
try:
out = pdf[(pdf['lat'].between(lat_lower, lat_upper)) & (pdf['lng'].between(lng_lower, lng_upper))]
dist = [None]*len(out)
for i in range(len(out)):
dist[i] = (out['lat'].iloc[i]-lat)**2 + (out['lng'].iloc[i]-lng)**2
zip = out['zipcode'].iloc[dist.index(min(dist))]
except:
zip = BAD_ZIPCODE_VALUE
return zip
def get_data_files(yyyy, months):
data_files = []
for mm in months:
mm = str(mm) if mm >= 10 else f"0{mm}"
month_data_files = list(filter(lambda file_name: f"{yyyy}-{mm}" in file_name,
[f.path for f in dbutils.fs.ls(file_location)]))
data_files += month_data_files
return data_files
def load_data(data_files, sample=1.0):
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("sep", ",")
.load(data_files)
).sample(False, sample, 123)
column_allow_list = {
"pickup_datetime": ["tpep_pickup_datetime", "timestamp"],
"tpep_pickup_datetime": ["tpep_pickup_datetime", "timestamp"],
"dropoff_datetime": ["tpep_dropoff_datetime", "timestamp"],
"tpep_dropoff_datetime": ["tpep_dropoff_datetime", "timestamp"],
"pickup_zip": ["pickup_zip", "integer"],
"dropoff_zip": ["dropoff_zip", "integer"],
"trip_distance": ["trip_distance", "double"],
"fare_amount": ["fare_amount", "double"],
"pickup_latitude": ["pickup_latitude", "double"],
"pickup_longitude": ["pickup_longitude", "double"],
"dropoff_latitude": ["dropoff_latitude", "double"],
"dropoff_longitude": ["dropoff_longitude", "double"],
}
columns = []
for orig in df.columns:
orig_lower = orig.lower()
if orig_lower in column_allow_list:
new_name, data_type = column_allow_list[orig_lower]
columns.append(col(orig).cast(data_type).alias(new_name.lower()))
return df.select(columns)
def annotate_zipcodes(df):
to_zip = lambda lat, lng: get_zipcode(col(lat).astype("double"), col(lng).astype("double"))
df = (df
.withColumn('pickup_zip', to_zip("pickup_latitude", "pickup_longitude"))
.withColumn('dropoff_zip', to_zip("dropoff_latitude", "dropoff_longitude"))
.drop('pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude')
)
df = df.filter(df.pickup_zip != BAD_ZIPCODE_VALUE)
df = df.filter(df.dropoff_zip != BAD_ZIPCODE_VALUE)
df = df.withColumn("pickup_zip", df["pickup_zip"].cast(IntegerType()))
df = df.withColumn("dropoff_zip", df["dropoff_zip"].cast(IntegerType()))
return df
def write_to_table(df, database, table):
(df.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(f"{database}.{table}"))
Feature Store taxi example - create input dataset
This notebook creates the input dataset used by the Feature Store taxi example notebook.
It includes these steps to preprocess the data:
feature_store_demo
database.dbfs:/databricks-datasets/nyctaxi
.feature_store_taxi_example
in thenyc_yellow_taxi_with_zips
database.Requirements
Databricks Runtime for Machine Learning 8.3 or above.