pandas-to-pandas-api-on-spark-in-10-minutes(Python)

Loading...

pandas to pandas API on Spark in 10 minutes

Migration from pandas to pandas API on Spark

Object creation

import numpy as np
import pandas as pd
import pyspark.pandas as ps
# Create a pandas Series
pser = pd.Series([1, 3, 5, np.nan, 6, 8]) 
# Create a pandas-on-Spark Series
psser = ps.Series([1, 3, 5, np.nan, 6, 8])
# Create a pandas-on-Spark Series by passing a pandas Series
psser = ps.Series(pser)
psser = ps.from_pandas(pser)
pser
Out[3]: 0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 dtype: float64
psser
Out[4]: 0 1.0 2 5.0 3 NaN 5 8.0 1 3.0 4 6.0 dtype: float64
psser.sort_index()
Out[5]: 0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 dtype: float64
# Create a pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a pandas-on-Spark DataFrame
psdf = ps.DataFrame({'A': np.random.rand(5),
                     'B': np.random.rand(5)})
# Create a pandas-on-Spark DataFrame by passing a pandas DataFrame
psdf = ps.DataFrame(pdf)
psdf = ps.from_pandas(pdf)
pdf
Out[7]:
psdf.sort_index()
Out[8]:

Viewing data

psdf.head(2)
Out[9]:
psdf.describe()
Out[10]:
psdf.sort_values(by='B')
Out[11]:
psdf.transpose()
Out[12]:
ps.get_option('compute.max_rows')
Out[13]: 1000
ps.set_option('compute.max_rows', 2000)
ps.get_option('compute.max_rows')
Out[14]: 2000

Selection

psdf['A']  # or psdf.A
Out[15]: 0 0.124840 1 0.295045 4 0.242770 2 0.277994 3 0.534681 Name: A, dtype: float64
psdf[['A', 'B']]
Out[16]:
psdf.loc[1:2]
Out[17]:
psdf.iloc[:3, 1:2]
Out[18]:
psser = ps.Series([100, 200, 300, 400, 500], index=[0, 1, 2, 3, 4])
# The below commented line will fail since pandas-on-Spark disallows adding columns coming from
# different DataFrames or Series to a pandas-on-Spark DataFrame as adding columns requires
# join operations which are generally expensive.
# This operation can be enabled by setting compute.ops_on_diff_frames to True.
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# psdf['C'] = psser
# Those are needed for managing options
from pyspark.pandas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
psdf['C'] = psser
# Reset to default to avoid potential expensive operation in the future
reset_option("compute.ops_on_diff_frames")
psdf
Out[20]:

Applying Python function with pandas-on-Spark object

psdf.apply(np.cumsum)
Out[21]:
psdf.apply(np.cumsum, axis=1)
Out[22]:
psdf.apply(lambda x: x ** 2)
Out[23]:
def square(x) -> ps.Series[np.float64]:
    return x ** 2
psdf.apply(square)
Out[25]:
# Working properly since size of data <= compute.shortcut_limit (1000)
ps.DataFrame({'A': range(1000)}).apply(lambda col: col.max())
Out[26]: A 999 dtype: int64
# Not working properly since size of data > compute.shortcut_limit (1000)
ps.DataFrame({'A': range(1001)}).apply(lambda col: col.max())
Out[27]: A 249 A 499 A 749 A 1000 A 124 A 374 A 624 A 874 dtype: int64
ps.set_option('compute.shortcut_limit', 1001)
ps.DataFrame({'A': range(1001)}).apply(lambda col: col.max())
Out[28]: A 1000 dtype: int64

Grouping Data

psdf.groupby('A').sum()
Out[29]:
psdf.groupby(['A', 'B']).sum()
Out[30]:

Plotting

# This is needed for visualizing plot on notebook
%matplotlib inline
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']
psdf = ps.DataFrame({'speed': speed,
                     'lifespan': lifespan}, index=index)
psdf.plot.bar()
Out[32]:
psdf.plot.barh()
Out[33]:
psdf = ps.DataFrame({'mass': [0.330, 4.87, 5.97],
                     'radius': [2439.7, 6051.8, 6378.1]},
                    index=['Mercury', 'Venus', 'Earth'])
psdf.plot.pie(y='mass')
Out[34]:
psdf = ps.DataFrame({
    'sales': [3, 2, 3, 9, 10, 6, 3],
    'signups': [5, 5, 6, 12, 14, 13, 9],
    'visits': [20, 42, 28, 62, 81, 50, 90],
}, index=pd.date_range(start='2019/08/15', end='2020/03/09',
                       freq='M'))
psdf.plot.area()
Out[35]:
psdf = ps.DataFrame({'pig': [20, 18, 489, 675, 1776],
                     'horse': [4, 25, 281, 600, 1900]},
                    index=[1990, 1997, 2003, 2009, 2014])
psdf.plot.line()
Out[36]:
pdf = pd.DataFrame(
    np.random.randint(1, 7, 6000),
    columns=['one'])
pdf['two'] = pdf['one'] + np.random.randint(1, 7, 6000)
psdf = ps.from_pandas(pdf)
psdf.plot.hist(bins=12, alpha=0.5)
Out[37]:
psdf = ps.DataFrame([[5.1, 3.5, 0], [4.9, 3.0, 0], [7.0, 3.2, 1],
                    [6.4, 3.2, 1], [5.9, 3.0, 2]],
                   columns=['length', 'width', 'species'])
psdf.plot.scatter(x='length',
                  y='width',
                  c='species')
Out[38]:

Missing Functionalities and Workarounds in pandas API on Spark

Directly use pandas APIs through type conversion

psidx = psdf.index
# Index.to_list() raises PandasNotImplementedError.
# pandas API on Spark does not support this because it requires collecting all data into the client
# (driver node) side. A simple workaround is to convert to pandas using to_pandas().
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# psidx.to_list()
psidx.to_pandas().to_list()
Out[41]: [0, 1, 2, 3, 4]

Native Support for pandas Objects

psdf = ps.DataFrame({'A': 1.,
                     'B': pd.Timestamp('20130102'),
                     'C': pd.Series(1, index=list(range(4)), dtype='float32'),
                     'D': np.array([3] * 4, dtype='int32'),
                     'F': 'foo'})
psdf
Out[43]:

Distributed execution for pandas functions

i = pd.date_range('2018-04-09', periods=2000, freq='1D1min')
ts = ps.DataFrame({'A': ['timestamp']}, index=i)

# DataFrame.between_time() is not yet implemented in pandas API on Spark.
# A simple workaround is to convert to a pandas DataFrame using to_pandas(),
# and then applying the function.
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# ts.between_time('0:15', '0:16')
ts.to_pandas().between_time('0:15', '0:16')
Out[45]:
ts.pandas_on_spark.apply_batch(func=lambda pdf: pdf.between_time('0:15', '0:16'))
Out[46]:

Using SQL in pandas API on Spark

psdf = ps.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                     'pig': [20, 18, 489, 675, 1776],
                     'horse': [4, 25, 281, 600, 1900]})
ps.sql("SELECT * FROM {psdf} WHERE pig > 100", psdf = psdf)
Out[48]:
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})
ps.sql('''
    SELECT ps.pig, pd.chicken
    FROM {psdf} ps INNER JOIN {pdf} pd
    ON ps.year = pd.year
    ORDER BY ps.pig, pd.chicken''', psdf = psdf)
Out[50]:

Working with PySpark

Conversion from and to PySpark DataFrame

psdf = ps.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
sdf = psdf.to_spark()
type(sdf)
Out[51]: pyspark.sql.dataframe.DataFrame
sdf.show()
+---+---+ | A| B| +---+---+ | 1| 10| | 2| 20| | 3| 30| | 4| 40| | 5| 50| +---+---+
from pyspark.pandas import option_context
with option_context(
        "compute.default_index_type", "distributed-sequence"):
    psdf = sdf.to_pandas_on_spark()
type(psdf)
Out[53]: pyspark.pandas.frame.DataFrame
psdf
Out[54]:
sdf.to_pandas_on_spark(index_col='A')
Out[55]:

Checking Spark execution plans

from pyspark.pandas import option_context

with option_context(
        "compute.ops_on_diff_frames", True,
        "compute.default_index_type", 'distributed'):
    df = ps.range(10) + ps.range(10)
    df.spark.explain()
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [coalesce(__this___index_level_0__#2334L, __that___index_level_0__#2342L) AS __index_level_0__#2347L, (__this_id#2335L + __that_id#2343L) AS id#2377L] +- SortMergeJoin [__this___index_level_0__#2334L], [__that___index_level_0__#2342L], FullOuter :- Sort [__this___index_level_0__#2334L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(__this___index_level_0__#2334L, 200), ENSURE_REQUIREMENTS, [id=#3243] : +- Project [__index_level_0__#2311L AS __this___index_level_0__#2334L, id#2309L AS __this_id#2335L] : +- Project [monotonically_increasing_id() AS __index_level_0__#2311L, id#2309L] : +- Range (0, 10, step=1, splits=8) +- Sort [__that___index_level_0__#2342L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(__that___index_level_0__#2342L, 200), ENSURE_REQUIREMENTS, [id=#3244] +- Project [__index_level_0__#2322L AS __that___index_level_0__#2342L, id#2320L AS __that_id#2343L] +- Project [monotonically_increasing_id() AS __index_level_0__#2322L, id#2320L] +- Range (0, 10, step=1, splits=8)
with option_context(
        "compute.ops_on_diff_frames", False,
        "compute.default_index_type", 'distributed'):
    df = ps.range(10)
    df = df + df
    df.spark.explain()
== Physical Plan == *(1) Project [__index_level_0__#2386L, (id#2384L + id#2384L) AS id#2398L] +- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2386L, id#2384L] +- *(1) Range (0, 10, step=1, splits=8)

Caching DataFrames

with option_context("compute.default_index_type", 'distributed'):
    df = ps.range(10)
    new_df = (df + df).spark.cache()  # `(df + df)` is cached here as `df`
    new_df.spark.explain()
== Physical Plan == *(1) ColumnarToRow +- InMemoryTableScan [__index_level_0__#2403L, id#2415L] +- InMemoryRelation [__index_level_0__#2403L, id#2415L, __natural_order__#2406L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [__index_level_0__#2403L, (id#2401L + id#2401L) AS id#2415L, __natural_order__#2406L] +- *(1) Project [__index_level_0__#2403L, id#2401L, monotonically_increasing_id() AS __natural_order__#2406L] +- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2403L, id#2401L] +- *(1) Range (0, 10, step=1, splits=8)
new_df.spark.unpersist()
with (df + df).spark.cache() as df:
    df.spark.explain()
== Physical Plan == *(1) ColumnarToRow +- InMemoryTableScan [__index_level_0__#2403L, id#2484L] +- InMemoryRelation [__index_level_0__#2403L, id#2484L, __natural_order__#2406L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [__index_level_0__#2403L, (id#2401L + id#2401L) AS id#2484L, __natural_order__#2406L] +- *(1) Project [__index_level_0__#2403L, id#2401L, monotonically_increasing_id() AS __natural_order__#2406L] +- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2403L, id#2401L] +- *(1) Range (0, 10, step=1, splits=8)