import pandas as pd
from scipy import stats
def fair_avg(durations):
"""Get an average duration among multiple durations fairly by removing the first run and the best run first."""
durations = durations[1:]
durations.remove(min(durations))
return sum(durations) / len(durations)
def rename_index(df):
"""Rename operations in the average result dataframe for clarity."""
df.index = map(
lambda s: s.replace("filtered ", "")
.replace("of columns", "of series")
.replace("addition of series", "series addition")
.replace("multiplication of series", "series multiplication")
.replace("arithmetic ops", "arithmetic")
.replace("count index length", "count index"),
df.index)
return df
def avg_result_df(file_name_prefix):
"""Get result files with the given prefix and then construct the average result dataframe."""
dfs = []
file_infos = dbutils.fs.ls('/FileStore/koalas-benchmark-no-parquet-cache')
for file_info in file_infos:
if file_info.name.startswith(file_name_prefix):
dfs.append(pd.read_parquet('/dbfs/FileStore/koalas-benchmark-no-parquet-cache/%s' % file_info.name))
print(f'{file_name_prefix} has {len(dfs)} runs')
avg_df = dfs[0].copy()
for op in dfs[0].index:
for lib in ['koalas', 'dask']:
durations = []
for df in dfs:
durations.append(df.loc[op][lib])
avg_df.loc[op][lib] = fair_avg(durations)
return rename_index(avg_df)
def annotate(ax):
"""Annotate the height of each bar in the plot."""
for p in ax.patches:
ax.annotate("%.2fs" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(0, 10), textcoords='offset points')
def annotate_x_times_faster(ax, x_times_list):
"""Annotate Koalas is how many times faster per operation in the plot."""
num_ops = len(x_times_list)
for i, p in enumerate(ax.patches):
if i < num_ops: # The first half of ax.patches of Koalas; we only annotate Koalas patches
ax.annotate("%.1fx" % x_times_list[i], (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(4, 10), textcoords='offset points', fontsize=8, weight='bold', color="#585858")
distributed_res_df = avg_result_df('distributed_')
distributed_res_df.columns = ['Koalas (PySpark)', 'Dask']
standard_ops = distributed_res_df.iloc[:15]
ops_with_filtering = distributed_res_df.iloc[15:]
distributed_res_with_caching_df = avg_result_df('cache_distributed_')
distributed_res_with_caching_df.columns = ['Koalas (PySpark)', 'Dask']
ops_with_filtering_caching = distributed_res_with_caching_df.iloc[15:]
distributed_ has 20 runs
cache_distributed_ has 20 runs
plot_title = 'Standard operations (distributed execution)'
ax = standard_ops.sort_index().plot.bar(title=plot_title)
ax.set_ylabel("Elapsed time (sec)")
tmp_df_x_times_faster = standard_ops.sort_index().copy()
tmp_df_x_times_faster['Dask / Koalas'] = tmp_df_x_times_faster.Dask / tmp_df_x_times_faster['Koalas (PySpark)']
tmp_df_x_times_faster['Koalas / Dask'] = tmp_df_x_times_faster['Koalas (PySpark)'] / tmp_df_x_times_faster.Dask
annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster['Dask / Koalas'].to_list())
standard_ops.sort_index().plot.bar(logy=True, title='%s - log scaling' % plot_title).set_ylabel("Elapsed time (sec)")
Out[3]:
Text(0, 0.5, 'Elapsed time (sec)')
print("Performance diff %% (simple avg): %s" % (sum(standard_ops.Dask / standard_ops['Koalas (PySpark)']) / len(standard_ops)))
print("Performance diff %% (geomean): %s" % stats.gmean(standard_ops.Dask / standard_ops['Koalas (PySpark)']))
Performance diff % (simple avg): 5.214644918097563
Performance diff % (geomean): 2.0831778574368713
plot_title = 'Operations with filtering (distributed execution)'
ax = ops_with_filtering.sort_index().plot.bar(title=plot_title)
ax.set_ylabel("Elapsed time (sec)")
tmp_df_x_times_faster = ops_with_filtering.sort_index().copy()
tmp_df_x_times_faster['Dask / Koalas'] = tmp_df_x_times_faster.Dask / tmp_df_x_times_faster['Koalas (PySpark)']
tmp_df_x_times_faster['Koalas / Dask'] = tmp_df_x_times_faster['Koalas (PySpark)'] / tmp_df_x_times_faster.Dask
annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster['Dask / Koalas'].to_list())
ops_with_filtering.sort_index().plot.bar(logy=True, title='%s - log scaling' % plot_title).set_ylabel("Elapsed time (sec)")
Out[8]:
Text(0, 0.5, 'Elapsed time (sec)')
print("Performance diff %% (simple avg): %s" % (sum(ops_with_filtering.Dask / ops_with_filtering['Koalas (PySpark)']) / len(ops_with_filtering)))
print("Performance diff %% (geomean): %s" % stats.gmean(ops_with_filtering.Dask / ops_with_filtering['Koalas (PySpark)']))
Performance diff % (simple avg): 10.264825867300187
Performance diff % (geomean): 9.21307241418659
plot_title = 'Operations with filtering and caching (distributed execution)'
ax = ops_with_filtering_caching.sort_index().plot.bar(title=plot_title)
ax.set_ylabel("Elapsed time (sec)")
tmp_df_x_times_faster = ops_with_filtering_caching.sort_index().copy()
tmp_df_x_times_faster['Dask / Koalas'] = tmp_df_x_times_faster.Dask / tmp_df_x_times_faster['Koalas (PySpark)']
tmp_df_x_times_faster['Koalas / Dask'] = tmp_df_x_times_faster['Koalas (PySpark)'] / tmp_df_x_times_faster.Dask
annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster['Dask / Koalas'].to_list())
ops_with_filtering_caching.sort_index().plot.bar(logy=True, title='%s - log scaling' % plot_title).set_ylabel("Elapsed time (sec)")
Out[13]:
Text(0, 0.5, 'Elapsed time (sec)')
Benchmark: Koalas (PySpark) and Dask - Distributed execution summary
The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data.
The benchmark results below explain the performance differences between Koalas and Dask. Because the Koalas APIs are written on the top of PySpark, the results of this benchmark would apply similarly to PySpark.