import pandas as pd
from scipy import stats
def fair_avg(durations):
"""Get an average duration among multiple durations fairly by removing the first run and the best run first."""
durations = durations[1:]
durations.remove(min(durations))
return sum(durations) / len(durations)
def rename_index(df):
"""Rename operations in the average result dataframe for clarity."""
df.index = map(
lambda s: s.replace("filtered ", "")
.replace("of columns", "of series")
.replace("addition of series", "series addition")
.replace("multiplication of series", "series multiplication")
.replace("arithmetic ops", "arithmetic")
.replace("count index length", "count index"),
df.index)
return df
def avg_result_df(file_name_prefix):
"""Get result files with the given prefix and then construct the average result dataframe."""
dfs = []
file_infos = dbutils.fs.ls('/FileStore/koalas-benchmark-no-parquet-cache')
for file_info in file_infos:
if file_info.name.startswith(file_name_prefix):
dfs.append(pd.read_parquet('/dbfs/FileStore/koalas-benchmark-no-parquet-cache/%s' % file_info.name))
print(f'{file_name_prefix} has {len(dfs)} runs')
avg_df = dfs[0].copy()
for op in dfs[0].index:
for lib in ['koalas', 'dask']:
durations = []
for df in dfs:
durations.append(df.loc[op][lib])
avg_df.loc[op][lib] = fair_avg(durations)
return rename_index(avg_df)
def annotate(ax):
"""Annotate the height of each bar in the plot."""
for p in ax.patches:
ax.annotate("%.2fs" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(0, 10), textcoords='offset points')
def annotate_x_times_faster(ax, x_times_list):
"""Annotate Koalas is how many times faster per operation in the plot."""
num_ops = len(x_times_list)
for i, p in enumerate(ax.patches):
if i < num_ops: # The first half of ax.patches of Koalas; we only annotate Koalas patches
ax.annotate("%.1fx" % x_times_list[i], (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(4, 10), textcoords='offset points', fontsize=8, weight='bold', color="#585858")
local_res_df = avg_result_df('single_node_')
local_res_df.columns = ['Koalas (PySpark)', 'Dask']
standard_ops = local_res_df.iloc[:15]
ops_with_filtering = local_res_df.iloc[15:]
local_res_with_caching_df = avg_result_df('cache_single_node_')
local_res_with_caching_df.columns = ['Koalas (PySpark)', 'Dask']
ops_with_filtering_caching = local_res_with_caching_df.iloc[15:]
single_node_ has 13 runs
cache_single_node_ has 23 runs
plot_title = 'Standard operations (local execution)'
ax = standard_ops.sort_index().plot.bar(title=plot_title)
ax.set_ylabel("Elapsed time (sec)")
tmp_df_x_times_faster = standard_ops.sort_index().copy()
tmp_df_x_times_faster['Dask / Koalas'] = tmp_df_x_times_faster.Dask / tmp_df_x_times_faster['Koalas (PySpark)']
tmp_df_x_times_faster['Koalas / Dask'] = tmp_df_x_times_faster['Koalas (PySpark)'] / tmp_df_x_times_faster.Dask
annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster['Dask / Koalas'].to_list())
standard_ops.sort_index().plot.bar(logy=True, title='%s - log scaling' % plot_title).set_ylabel("Elapsed time (sec)")
Out[3]:
Text(0, 0.5, 'Elapsed time (sec)')
print("Performance diff %% (simple avg): %s" % (sum(standard_ops.Dask / standard_ops['Koalas (PySpark)']) / len(standard_ops)))
print("Performance diff %% (geomean): %s" % stats.gmean(standard_ops.Dask / standard_ops['Koalas (PySpark)']))
arithmetic_ops = standard_ops.filter(items=['complex arithmetic', 'series multiplication', 'series addition'], axis=0)
print("Performance diff (arthemetic) %% (simple avg): %s" % (sum(arithmetic_ops.Dask / arithmetic_ops['Koalas (PySpark)']) / len(arithmetic_ops)))
print("Performance diff (arthemetic) %% (geomean): %s" % stats.gmean(arithmetic_ops.Dask / arithmetic_ops['Koalas (PySpark)']))
basic_stats_ops = standard_ops.filter(items=['count', 'mean', 'standard deviation', 'count index', 'join', 'join count'], axis=0)
print("Performance diff (basic stats) %% (simple avg): %s" % (sum(basic_stats_ops.Dask / basic_stats_ops['Koalas (PySpark)']) / len(basic_stats_ops)))
print("Performance diff (basic stats) %% (geomean): %s" % stats.gmean(basic_stats_ops.Dask / basic_stats_ops['Koalas (PySpark)']))
Performance diff % (simple avg): 3.1379990701042995
Performance diff % (geomean): 1.160976398518503
Performance diff (arthemetic) % (simple avg): 0.9645016710988626
Performance diff (arthemetic) % (geomean): 0.9625863048311519
Performance diff (basic stats) % (simple avg): 6.382956836372958
Performance diff (basic stats) % (geomean): 2.15732786302932
plot_title = 'Operations with filtering (local execution)'
ax = ops_with_filtering.sort_index().plot.bar(title=plot_title)
ax.set_ylabel("Elapsed time (sec)")
tmp_df_x_times_faster = ops_with_filtering.sort_index().copy()
tmp_df_x_times_faster['Dask / Koalas'] = tmp_df_x_times_faster.Dask / tmp_df_x_times_faster['Koalas (PySpark)']
tmp_df_x_times_faster['Koalas / Dask'] = tmp_df_x_times_faster['Koalas (PySpark)'] / tmp_df_x_times_faster.Dask
annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster['Dask / Koalas'].to_list())
ops_with_filtering.sort_index().plot.bar(logy=True, title='%s - log scaling' % plot_title).set_ylabel("Elapsed time (sec)")
Out[9]:
Text(0, 0.5, 'Elapsed time (sec)')
print("Performance diff %% (simple avg): %s" % (sum(ops_with_filtering.Dask / ops_with_filtering['Koalas (PySpark)']) / len(ops_with_filtering)))
print("Performance diff %% (geomean): %s" % stats.gmean(ops_with_filtering.Dask / ops_with_filtering['Koalas (PySpark)']))
Performance diff % (simple avg): 6.958667208537091
Performance diff % (geomean): 6.367004494100208
plot_title = 'Operations with filtering and caching (local execution)'
ax = ops_with_filtering_caching.sort_index().plot.bar(title=plot_title)
ax.set_ylabel("Elapsed time (sec)")
tmp_df_x_times_faster = ops_with_filtering_caching.sort_index().copy()
tmp_df_x_times_faster['Dask / Koalas'] = tmp_df_x_times_faster.Dask / tmp_df_x_times_faster['Koalas (PySpark)']
tmp_df_x_times_faster['Koalas / Dask'] = tmp_df_x_times_faster['Koalas (PySpark)'] / tmp_df_x_times_faster.Dask
annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster['Dask / Koalas'].to_list())
ops_with_filtering_caching.sort_index().plot.bar(logy=True, title='%s - log scaling' % plot_title).set_ylabel("Elapsed time (sec)")
Out[14]:
Text(0, 0.5, 'Elapsed time (sec)')
Benchmark: Koalas (PySpark) and Dask - Local execution summary
The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data.
The benchmark results below explain the performance differences between Koalas and Dask. Because the Koalas APIs are written on the top of PySpark, the results of this benchmark would apply similarly to PySpark.