import pandas as pd
import numpy as np
import databricks.koalas as ks
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('koalas version: %s' % ks.__version__)
import dask
print('dask version: %s' % dask.__version__)
import time
def benchmark(f, df, benchmarks, name, **kwargs):
"""Benchmark the given function against the given DataFrame.
Parameters
----------
f: function to benchmark
df: data frame
benchmarks: container for benchmark results
name: task name
Returns
-------
Duration (in seconds) of the given operation
"""
start_time = time.time()
ret = f(df, **kwargs)
benchmarks['duration'].append(time.time() - start_time)
benchmarks['task'].append(name)
print(f"{name} took: {benchmarks['duration'][-1]} seconds")
return benchmarks['duration'][-1]
def get_results(benchmarks):
"""Return a pandas DataFrame containing benchmark results."""
return pd.DataFrame.from_dict(benchmarks)
client = Client('127.0.0.1:8786')
client_info_dict = client.scheduler_info()
worker_hosts = set([client_info_dict["workers"][worker_id]['host'] for worker_id in client_info_dict["workers"].keys()])
print(f'Worker hosts: {worker_hosts}')
dask_data = dd.read_parquet('/dbfs/FileStore/ks_taxi_parquet', index='index')
dask_benchmarks = {
'duration': [], # in seconds
'task': [],
}
def read_file_parquet(df=None):
return dd.read_parquet('/dbfs/FileStore/ks_taxi_parquet', index='index')
def count(df=None):
return len(df)
def count_index_length(df=None):
return len(df.index)
def mean(df):
return df.fare_amt.mean().compute()
def standard_deviation(df):
return df.fare_amt.std().compute()
def mean_of_sum(df):
return (df.fare_amt + df.tip_amt).mean().compute()
def sum_columns(df):
return (df.fare_amt + df.tip_amt).compute()
def mean_of_product(df):
return (df.fare_amt * df.tip_amt).mean().compute()
def product_columns(df):
return (df.fare_amt * df.tip_amt).compute()
def value_counts(df):
return df.fare_amt.value_counts().compute()
def mean_of_complicated_arithmetic_operation(df):
theta_1 = df.start_lon
phi_1 = df.start_lat
theta_2 = df.end_lon
phi_2 = df.end_lat
temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
+ np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
return ret.mean().compute()
def complicated_arithmetic_operation(df):
theta_1 = df.start_lon
phi_1 = df.start_lat
theta_2 = df.end_lon
phi_2 = df.end_lat
temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
+ np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
return ret.compute()
def groupby_statistics(df):
return df.groupby(by='passenger_count').agg(
{
'fare_amt': ['mean', 'std'],
'tip_amt': ['mean', 'std']
}
).compute()
other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
def join_count(df, other):
return len(dd.merge(df, other, left_index=True, right_index=True))
def join_data(df, other):
return dd.merge(df, other, left_index=True, right_index=True).compute()
benchmark(read_file_parquet, df=None, benchmarks=dask_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_benchmarks, name='join', other=other)
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')
other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)
def read_file_parquet(df=None):
return ks.read_parquet('/FileStore/ks_taxi_parquet', index_col='index')
def count(df=None):
return len(df)
def count_index_length(df=None):
return len(df.index)
def mean(df):
return df.fare_amt.mean()
def standard_deviation(df):
return df.fare_amt.std()
def mean_of_sum(df):
return (df.fare_amt + df.tip_amt).mean()
def sum_columns(df):
x = df.fare_amt + df.tip_amt
x.to_pandas()
return x
def mean_of_product(df):
return (df.fare_amt * df.tip_amt).mean()
def product_columns(df):
x = df.fare_amt * df.tip_amt
x.to_pandas()
return x
def value_counts(df):
val_counts = df.fare_amt.value_counts()
val_counts.to_pandas()
return val_counts
def complicated_arithmetic_operation(df):
theta_1 = df.start_lon
phi_1 = df.start_lat
theta_2 = df.end_lon
phi_2 = df.end_lat
temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
+ np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
ret.to_pandas()
return ret
def mean_of_complicated_arithmetic_operation(df):
theta_1 = df.start_lon
phi_1 = df.start_lat
theta_2 = df.end_lon
phi_2 = df.end_lat
temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
+ np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
return ret.mean()
def groupby_statistics(df):
gb = df.groupby(by='passenger_count').agg(
{
'fare_amt': ['mean', 'std'],
'tip_amt': ['mean', 'std']
}
)
gb.to_pandas()
return gb
other = ks.DataFrame(groupby_statistics(koalas_data).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
def join_count(df, other):
return len(df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True))
def join_data(df, other):
ret = df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True)
ret.to_pandas()
return ret
benchmark(read_file_parquet, df=None, benchmarks=koalas_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_benchmarks, name='count')
benchmark(count_index_length, df=koalas_data, benchmarks=koalas_benchmarks, name='count index length')
benchmark(mean, df=koalas_data, benchmarks=koalas_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_benchmarks, name='join count', other=other)
benchmark(join_data, koalas_data, benchmarks=koalas_benchmarks, name='join', other=other)
benchmark(count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count')
benchmark(count_index_length, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count index length')
benchmark(mean, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered groupby statistics')
other = ks.DataFrame(groupby_statistics(koalas_filtered).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
benchmark(join_data, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join count', other=other)
Benchmark: Koalas (PySpark) and Dask - Distributed execution (with caching)
The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data. We identified common operations from our pandas workloads such as basic calculations of statistics, join, filtering and grouping on this dataset.
The operations were measured with/without filter operations and caching to consider real world workloads.