# Create a pandas DataFrame pdf = pd.DataFrame({'A': np.random.rand(5), 'B': np.random.rand(5)}) # Create a pandas-on-Spark DataFrame psdf = ps.DataFrame({'A': np.random.rand(5), 'B': np.random.rand(5)}) # Create a pandas-on-Spark DataFrame by passing a pandas DataFrame psdf = ps.DataFrame(pdf) psdf = ps.from_pandas(pdf)
psser = ps.Series([100, 200, 300, 400, 500], index=[0, 1, 2, 3, 4]) # The below commented line will fail since pandas-on-Spark disallows adding columns coming from # different DataFrames or Series to a pandas-on-Spark DataFrame as adding columns requires # join operations which are generally expensive. # This operation can be enabled by setting compute.ops_on_diff_frames to True. # If you want to know about more detail, See the following blog post. # https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html # psdf['C'] = psser
# Index.to_list() raises PandasNotImplementedError. # pandas API on Spark does not support this because it requires collecting all data into the client # (driver node) side. A simple workaround is to convert to pandas using to_pandas(). # If you want to know about more detail, See the following blog post. # https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html # psidx.to_list()
i = pd.date_range('2018-04-09', periods=2000, freq='1D1min') ts = ps.DataFrame({'A': ['timestamp']}, index=i) # DataFrame.between_time() is not yet implemented in pandas API on Spark. # A simple workaround is to convert to a pandas DataFrame using to_pandas(), # and then applying the function. # If you want to know about more detail, See the following blog post. # https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html # ts.between_time('0:15', '0:16')
from pyspark.pandas import option_context with option_context( "compute.ops_on_diff_frames", True, "compute.default_index_type", 'distributed'): df = ps.range(10) + ps.range(10) df.spark.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [coalesce(__this___index_level_0__#2334L, __that___index_level_0__#2342L) AS __index_level_0__#2347L, (__this_id#2335L + __that_id#2343L) AS id#2377L]
+- SortMergeJoin [__this___index_level_0__#2334L], [__that___index_level_0__#2342L], FullOuter
:- Sort [__this___index_level_0__#2334L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(__this___index_level_0__#2334L, 200), ENSURE_REQUIREMENTS, [id=#3243]
: +- Project [__index_level_0__#2311L AS __this___index_level_0__#2334L, id#2309L AS __this_id#2335L]
: +- Project [monotonically_increasing_id() AS __index_level_0__#2311L, id#2309L]
: +- Range (0, 10, step=1, splits=8)
+- Sort [__that___index_level_0__#2342L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(__that___index_level_0__#2342L, 200), ENSURE_REQUIREMENTS, [id=#3244]
+- Project [__index_level_0__#2322L AS __that___index_level_0__#2342L, id#2320L AS __that_id#2343L]
+- Project [monotonically_increasing_id() AS __index_level_0__#2322L, id#2320L]
+- Range (0, 10, step=1, splits=8)
with option_context( "compute.ops_on_diff_frames", False, "compute.default_index_type", 'distributed'): df = ps.range(10) df = df + df df.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#2386L, (id#2384L + id#2384L) AS id#2398L]
+- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2386L, id#2384L]
+- *(1) Range (0, 10, step=1, splits=8)
with option_context("compute.default_index_type", 'distributed'): df = ps.range(10) new_df = (df + df).spark.cache() # `(df + df)` is cached here as `df` new_df.spark.explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [__index_level_0__#2403L, id#2415L]
+- InMemoryRelation [__index_level_0__#2403L, id#2415L, __natural_order__#2406L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [__index_level_0__#2403L, (id#2401L + id#2401L) AS id#2415L, __natural_order__#2406L]
+- *(1) Project [__index_level_0__#2403L, id#2401L, monotonically_increasing_id() AS __natural_order__#2406L]
+- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2403L, id#2401L]
+- *(1) Range (0, 10, step=1, splits=8)
with (df + df).spark.cache() as df: df.spark.explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [__index_level_0__#2403L, id#2484L]
+- InMemoryRelation [__index_level_0__#2403L, id#2484L, __natural_order__#2406L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [__index_level_0__#2403L, (id#2401L + id#2401L) AS id#2484L, __natural_order__#2406L]
+- *(1) Project [__index_level_0__#2403L, id#2401L, monotonically_increasing_id() AS __natural_order__#2406L]
+- *(1) Project [monotonically_increasing_id() AS __index_level_0__#2403L, id#2401L]
+- *(1) Range (0, 10, step=1, splits=8)
pandas to pandas API on Spark in 10 minutes