pandas to Koalas in 10 minutes(Python)

pandas to Koalas in 10 minutes

Migration from pandas to Koalas

Object creation

import numpy as np
import pandas as pd
import databricks.koalas as ks
# Create a pandas Series
pser = pd.Series([1, 3, 5, np.nan, 6, 8]) 
# Create a Koalas Series
kser = ks.Series([1, 3, 5, np.nan, 6, 8])
# Create a Koalas Series by passing a pandas Series
kser = ks.Series(pser)
kser = ks.from_pandas(pser)
pser
Out[3]: 0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 dtype: float64
kser
Out[4]: 1 3.0 3 NaN 5 8.0 0 1.0 2 5.0 4 6.0 Name: 0, dtype: float64
kser.sort_index()
Out[5]: 0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 Name: 0, dtype: float64
# Create a pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a Koalas DataFrame by passing a pandas DataFrame
kdf = ks.DataFrame(pdf)
kdf = ks.from_pandas(pdf)
pdf
Out[7]:
kdf.sort_index()
Out[8]:

Viewing data

kdf.head(2)
Out[9]:
kdf.describe()
Out[10]:
kdf.sort_values(by='B')
Out[11]:
kdf.transpose()
Out[12]:
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
Out[13]: 1000
ks.set_option('compute.max_rows', 2000)
ks.get_option('compute.max_rows')
Out[14]: 2000

Selection

kdf['A']  # or kdf.A
Out[15]: 1 0.720926 3 0.999945 0 0.579216 2 0.438917 4 0.065234 Name: A, dtype: float64
kdf[['A', 'B']]
Out[16]:
kdf.loc[1:2]
Out[17]:
kdf.iloc[:3, 1:2]
Out[18]:
kser = ks.Series([100, 200, 300, 400, 500], index=[0, 1, 2, 3, 4])
# The below commented line will fail since Koalas disallows adding columns coming from
# different DataFrames or Series to a Koalas DataFrame as adding columns requires
# join operations which are generally expensive.
# This operation can be enabled by setting compute.ops_on_diff_frames to True.
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# kdf['C'] = kser
# Those are needed for managing options
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
kdf['C'] = kser
# Reset to default to avoid potential expensive operation in the future
reset_option("compute.ops_on_diff_frames")
kdf
Out[20]:

Applying Python function with Koalas object

kdf.apply(np.cumsum)
Out[21]:
kdf.apply(np.cumsum, axis=1)
Out[22]:
kdf.apply(lambda x: x ** 2)
Out[23]:
def square(x) -> ks.Series[np.float64]:
    return x ** 2
kdf.apply(square)
Out[25]:
# Working properly since size of data <= compute.shortcut_limit (1000)
ks.DataFrame({'A': range(1000)}).apply(lambda col: col.max())
Out[26]: A 999 Name: 0, dtype: int64
# Not working properly since size of data > compute.shortcut_limit (1000)
ks.DataFrame({'A': range(1001)}).apply(lambda col: col.max())
Out[27]: A 251 A 629 A 377 A 881 A 503 A 755 A 1000 A 125 Name: 0, dtype: int64
ks.set_option('compute.shortcut_limit', 1001)
ks.DataFrame({'A': range(1001)}).apply(lambda col: col.max())
Out[28]: A 1000 Name: 0, dtype: int64

Grouping Data

kdf.groupby('A').sum()
Out[29]:
kdf.groupby(['A', 'B']).sum()
Out[30]:

Plotting

# This is needed for visualizing plot on notebook
%matplotlib inline
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']
kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()
kdf.plot.barh()
kdf = ks.DataFrame({'mass': [0.330, 4.87, 5.97],
                    'radius': [2439.7, 6051.8, 6378.1]},
                   index=['Mercury', 'Venus', 'Earth'])
kdf.plot.pie(y='mass')
kdf = ks.DataFrame({
    'sales': [3, 2, 3, 9, 10, 6, 3],
    'signups': [5, 5, 6, 12, 14, 13, 9],
    'visits': [20, 42, 28, 62, 81, 50, 90],
}, index=pd.date_range(start='2019/08/15', end='2020/03/09',
                       freq='M'))
kdf.plot.area()
kdf = ks.DataFrame({'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]},
                   index=[1990, 1997, 2003, 2009, 2014])
kdf.plot.line()
kdf = pd.DataFrame(
    np.random.randint(1, 7, 6000),
    columns=['one'])
kdf['two'] = kdf['one'] + np.random.randint(1, 7, 6000)
kdf = ks.from_pandas(kdf)
kdf.plot.hist(bins=12, alpha=0.5)
kdf = ks.DataFrame([[5.1, 3.5, 0], [4.9, 3.0, 0], [7.0, 3.2, 1],
                    [6.4, 3.2, 1], [5.9, 3.0, 2]],
                   columns=['length', 'width', 'species'])
kdf.plot.scatter(x='length',
                 y='width',
                 c='species',
                 colormap='viridis')

Missing Functionalities and Workarounds in Koalas

Directly use pandas APIs through type conversion

kidx = kdf.index
# Index.to_list() raises PandasNotImplementedError.
# Koalas does not support this because it requires collecting all data into the client
# (driver node) side. A simple workaround is to convert to pandas using to_pandas().
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# kidx.to_list()
kidx.to_pandas().to_list()
Out[41]: [0, 1, 2, 3, 4]

Native Support for pandas Objects

kdf = ks.DataFrame({'A': 1.,
                    'B': pd.Timestamp('20130102'),
                    'C': pd.Series(1, index=list(range(4)), dtype='float32'),
                    'D': np.array([3] * 4, dtype='int32'),
                    'F': 'foo'})
kdf
Out[43]:

Distributed execution for pandas functions

i = pd.date_range('2018-04-09', periods=2000, freq='1D1min')
ts = ks.DataFrame({'A': ['timestamp']}, index=i)

# DataFrame.between_time() is not yet implemented in Koalas.
# A simple workaround is to convert to a pandas DataFrame using to_pandas(),
# and then applying the function.
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# ts.between_time('0:15', '0:16')
ts.to_pandas().between_time('0:15', '0:16')
Out[45]:
ts.map_in_pandas(func=lambda pdf: pdf.between_time('0:15', '0:16'))
Out[46]:

Using SQL in Koalas

kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")
Out[48]:
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')
Out[50]:

Working with PySpark

Conversion from and to PySpark DataFrame

kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
sdf = kdf.to_spark()
type(sdf)
Out[51]: pyspark.sql.dataframe.DataFrame
sdf.show()
+---+---+ | A| B| +---+---+ | 1| 10| | 2| 20| | 3| 30| | 4| 40| | 5| 50| +---+---+
from databricks.koalas import option_context
with option_context(
        "compute.default_index_type", "distributed-sequence"):
    kdf = sdf.to_koalas()
type(kdf)
Out[53]: databricks.koalas.frame.DataFrame
kdf
Out[54]:
sdf.to_koalas(index_col='A')
Out[55]:

Checking Spark execution plans

from databricks.koalas import option_context

with option_context(
        "compute.ops_on_diff_frames", True,
        "compute.default_index_type", 'distributed'):
    df = ks.range(10) + ks.range(10)
    df.explain()
== Physical Plan == *(3) Project [CASE WHEN isnotnull(__index_level_0__#2408L) THEN __index_level_0__#2408L ELSE __index_level_0__#2418L END AS __index_level_0__#2426L, (id#2406L + id#2416L) AS id#2483L] +- SortMergeJoin [__index_level_0__#2408L], [__index_level_0__#2418L], FullOuter :- Sort [__index_level_0__#2408L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(__index_level_0__#2408L, 200), [id=#3015] : +- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2408L, id#2406L] : +- *(1) Range (0, 10, step=1, splits=8) +- Sort [__index_level_0__#2418L ASC NULLS FIRST], false, 0 +- ReusedExchange [__index_level_0__#2418L, id#2416L], Exchange hashpartitioning(__index_level_0__#2408L, 200), [id=#3015]
with option_context(
        "compute.ops_on_diff_frames", False,
        "compute.default_index_type", 'distributed'):
    df = ks.range(10)
    df = df + df
    df.explain()
== Physical Plan == *(1) Project [__index_level_0__#2497L, (id#2495L + id#2495L) AS id#2506L] +- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2497L, id#2495L] +- *(1) Range (0, 10, step=1, splits=8)

Caching DataFrames

with option_context("compute.default_index_type", 'distributed'):
    df = ks.range(10)
    new_df = (df + df).cache()  # `(df + df)` is cached here as `df`
    new_df.explain()
== Physical Plan == *(1) InMemoryTableScan [__index_level_0__#2516L, id#2525L] +- InMemoryRelation [__index_level_0__#2516L, id#2525L, __natural_order__#2519L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [__index_level_0__#2516L, (id#2514L + id#2514L) AS id#2525L, __natural_order__#2519L] +- *(1) Project [__index_level_0__#2516L, id#2514L, monotonically_increasing_id() AS __natural_order__#2519L] +- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2516L, id#2514L] +- *(1) Range (0, 10, step=1, splits=8)