def read_file_parquet(df=None):
return dd.read_parquet('/dbfs/FileStore/ks_taxi_parquet', index='index')
def count(df=None):
return len(df)
def count_index_length(df=None):
return len(df.index)
def mean(df):
return df.fare_amt.mean().compute()
def standard_deviation(df):
return df.fare_amt.std().compute()
def mean_of_sum(df):
return (df.fare_amt + df.tip_amt).mean().compute()
def sum_columns(df):
return (df.fare_amt + df.tip_amt).compute()
def mean_of_product(df):
return (df.fare_amt * df.tip_amt).mean().compute()
def product_columns(df):
return (df.fare_amt * df.tip_amt).compute()
def value_counts(df):
return df.fare_amt.value_counts().compute()
def mean_of_complicated_arithmetic_operation(df):
theta_1 = df.start_lon
phi_1 = df.start_lat
theta_2 = df.end_lon
phi_2 = df.end_lat
temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
+ np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
return ret.mean().compute()
def complicated_arithmetic_operation(df):
theta_1 = df.start_lon
phi_1 = df.start_lat
theta_2 = df.end_lon
phi_2 = df.end_lat
temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
+ np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
return ret.compute()
def groupby_statistics(df):
return df.groupby(by='passenger_count').agg(
{
'fare_amt': ['mean', 'std'],
'tip_amt': ['mean', 'std']
}
).compute()
other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
def join_count(df, other):
return len(dd.merge(df, other, left_index=True, right_index=True))
def join_data(df, other):
return dd.merge(df, other, left_index=True, right_index=True).compute()
Benchmark: Koalas (PySpark) and Dask - Local execution
The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data. We identified common operations from our pandas workloads such as basic calculations of statistics, join, filtering and grouping on this dataset.
The operations were measured with/without filter operations to consider real world workloads.