Data Preparation(Python)

Loading...

Benchmark: Koalas (PySpark) and Dask - Data Preparation

The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data.

The CSV files were downloaded into Databricks File System (DBFS), and then were converted into Parquet files via Koalas for better efficiency.

Download url: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

Data dictionary: https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf.

The scenario used in this benchmark was inspired by https://github.com/xdssio/big_data_benchmarks.

Download CSV files to DBFS

%sh mkdir -p /dbfs/FileStore/taxi_csv/
%sh ls /dbfs/FileStore/taxi_csv
url_loc = {} # Map download url to the file location in DBFS
 
for year in range(2009, 2014):
  for m in range(1, 13):
    month = "{:02d}".format(m)
    fname = 'yellow_tripdata_%s-%s.csv' % (year, month)
    url = 'https://s3.amazonaws.com/nyc-tlc/trip+data/%s' % fname
    loc = '/dbfs/FileStore/taxi_csv/%s' % fname
    url_loc[url] = loc
import urllib.request
 
for url, loc in url_loc.items():
  urllib.request.urlretrieve(url, loc)
Cancelled
%sh ls /dbfs/FileStore/taxi_csv
yellow_tripdata_2009-01.csv yellow_tripdata_2009-02.csv yellow_tripdata_2009-03.csv yellow_tripdata_2009-04.csv yellow_tripdata_2009-05.csv yellow_tripdata_2009-06.csv yellow_tripdata_2009-07.csv yellow_tripdata_2009-08.csv yellow_tripdata_2009-09.csv yellow_tripdata_2009-10.csv yellow_tripdata_2009-11.csv yellow_tripdata_2009-12.csv yellow_tripdata_2010-01.csv yellow_tripdata_2010-02.csv yellow_tripdata_2010-03.csv yellow_tripdata_2010-04.csv yellow_tripdata_2010-05.csv yellow_tripdata_2010-06.csv yellow_tripdata_2010-07.csv yellow_tripdata_2010-08.csv yellow_tripdata_2010-09.csv yellow_tripdata_2010-10.csv yellow_tripdata_2010-11.csv yellow_tripdata_2010-12.csv yellow_tripdata_2011-01.csv yellow_tripdata_2011-02.csv yellow_tripdata_2011-03.csv yellow_tripdata_2011-04.csv yellow_tripdata_2011-05.csv yellow_tripdata_2011-06.csv yellow_tripdata_2011-07.csv yellow_tripdata_2011-08.csv yellow_tripdata_2011-09.csv yellow_tripdata_2011-10.csv yellow_tripdata_2011-11.csv yellow_tripdata_2011-12.csv yellow_tripdata_2012-01.csv yellow_tripdata_2012-02.csv yellow_tripdata_2012-03.csv yellow_tripdata_2012-04.csv yellow_tripdata_2012-05.csv yellow_tripdata_2012-06.csv yellow_tripdata_2012-07.csv yellow_tripdata_2012-08.csv yellow_tripdata_2012-09.csv yellow_tripdata_2012-10.csv yellow_tripdata_2012-11.csv yellow_tripdata_2012-12.csv yellow_tripdata_2013-01.csv yellow_tripdata_2013-02.csv yellow_tripdata_2013-03.csv yellow_tripdata_2013-04.csv yellow_tripdata_2013-05.csv yellow_tripdata_2013-06.csv yellow_tripdata_2013-07.csv yellow_tripdata_2013-08.csv yellow_tripdata_2013-09.csv yellow_tripdata_2013-10.csv yellow_tripdata_2013-11.csv yellow_tripdata_2013-12.csv yellow_tripdata_2014-01.csv
total_bytes = 0
for fileInfo in dbutils.fs.ls('FileStore/taxi_csv'):
  total_bytes += fileInfo.size
print('%s GBs data in total' % (total_bytes * 1e-9))
157.279920581 GBs data in total

Convert to Parquet files

Convert downloaded CSV files into Parquet files via Koalas for better efficiency.

import databricks.koalas as ks
 
ks.set_option('compute.default_index_type', 'distributed-sequence') 
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. Koalas will set it for you but it does not work if there is a Spark context already launched.
dtype_dict = {
  'Passenger_Count': 'int64', 
  'Start_Lon': 'float64', 
  'Start_Lat': 'float64',
  'End_Lon': 'float64', 
  'End_Lat': 'float64', 
  'Fare_Amt': 'float64', 
  'Tip_Amt': 'float64', 
  'Tolls_Amt': 'float64',
  'Total_Amt': 'float64'
}
ks_df= ks.read_csv('/FileStore/taxi_csv', dtype=dtype_dict)
ks_df.columns = ks_df.columns.str.lower()
ks_df.dtypes
Out[10]: vendor_name object trip_pickup_datetime object trip_dropoff_datetime object passenger_count int64 trip_distance object start_lon float64 start_lat float64 rate_code object store_and_forward object end_lon float64 end_lat float64 payment_type object fare_amt float64 surcharge object mta_tax object tip_amt float64 tolls_amt float64 total_amt float64 dtype: object
%sh rm -fr /dbfs/FileStore/ks_taxi_parquet
ks_df.to_parquet('FileStore/ks_taxi_parquet', index_col='index')
total_bytes = 0
for file_info in dbutils.fs.ls('FileStore/ks_taxi_parquet'):
  total_bytes += file_info.size
print('%s GBs data in total' % (total_bytes * 1e-9))
45.570346225 GBs data in total

Note: Filtering Size

(Size of filtered data / Size of total data) in the benchmark

import databricks.koalas as ks
koalas_data = ks.read_parquet('/FileStore/ks_taxi_parquet')
expr_filter = (koalas_data.tip_amt >= 1) & (koalas_data.tip_amt <= 5)
 
print(f'In the benchmark, filtered data is {len(koalas_data[expr_filter]) / len(koalas_data) * 100}% of total data')
In the benchmark, filtered data is 36.028753408727766% of total data