メインコンテンツまでスキップ

Pandas 機能 APIs

関数Pandas APIsを使用すると、 インスタンスをPythonPandas に取得して出力するPySparkデータフレーム ネイティブ関数を直接適用できます。Pandasユーザー定義関数と同様に、関数APIsもApache矢印を使用してデータを転送し、Pandasデータを操作します。ただし、Python 型ヒントは、Pandas 関数 APIsでは省略可能です。

pandas function APIsには3つのタイプがあります。

  • グループ化マップ
  • Map
  • 複合グループ化マップ

pandas function APIs は pandas UDF 実行が使用するのと同じ内部ロジックを利用します。 これらは、PyArrow、サポートされている SQL 型、構成などの特性を共有します。

詳細については、ブログ記事 New Pandas UDF and Python Type Hints in the Upcoming Release of Apache Spark 3.0 を参照してください。

グループ化されたマップ

グループ化されたデータを groupBy().applyInPandas() を使用して変換し、"分割、適用、結合" パターンを実装します。 分割-適用-結合は、次の 3 つのステップで構成されます。

  • DataFrame.groupByを使用してデータをグループに分割します。
  • 各グループに関数を適用します。 関数の入力と出力は両方とも pandas.DataFrameです。 入力データには、各グループのすべての行と列が含まれます。
  • 結果を新しい DataFrameに結合します。

groupBy().applyInPandas() を使用するには、以下を定義する必要があります。

  • 各グループの計算を定義する Python 関数
  • 出力のスキーマを定義する StructType オブジェクトまたは文字列 DataFrame

返される pandas.DataFrame の列ラベルは、定義された出力スキーマのフィールド名と一致する (文字列として指定されている場合) か、文字列でない場合はフィールドデータ型 (整数インデックスなど) でフィールドデータ型と一致する必要があります。 パンダを参照してください。データフレーム は、pandas.DataFrameの作成時に列にラベルを付ける方法について説明します。

グループのすべてのデータは、関数が適用される前にメモリに読み込まれます。 これにより、特にグループ サイズが偏っている場合に、メモリ不足の例外が発生する可能性があります。 maxRecordsPerBatch の構成はグループには適用されず、グループ化されたデータが使用可能なメモリに収まるようにする必要があります。

次の例は、 groupby().apply() を使用してグループ内の各値から平均を減算する方法を示しています。

Python
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+

詳しい使い方は pyspark.sql.GroupedData.applyInPandas を参照してください。

地図

PandasDataFrame.mapInPandas()の反復子を現在のpandas.DataFrame を表す別の の反復子に変換し、結果をpandas.DataFrame PySparkデータフレームPySparkデータフレームとして返すために、 ごとにインスタンスを してマップ操作を実行します。

基になる関数は、 pandas.DataFrameのイテレータを受け取り、出力します。 シリーズからシリーズなどの一部のpandas UDFとは対照的に、任意の長さの出力を返すことができます。

次の例は、 mapInPandas()の使用方法を示しています。

Python
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+

詳しい使い方は pyspark.sql.データフレーム.mapInPandas を参照してください。

コグルーグルされたマップ

Pandasインスタンスでコグループ化されたマップ操作の場合は、DataFrame.groupby().cogroup().applyInPandas()を使用して共通のキーで 2 つのPySpark DataFrameをコグループ化し、次に示すように各コグループに Python 関数を適用します。

  • キーを共有する各 データフレーム のグループが一緒にグループ化されるように、データをシャッフルします。
  • 複合グループに関数を適用します。 関数の入力は 2 つの pandas.DataFrame です (キーを表すオプションのタプル付き)。 関数の出力は pandas.DataFrameです。
  • すべてのグループの pandas.DataFrameを新しいPySpark DataFrameに結合します。

groupBy().cogroup().applyInPandas() を使用するには、以下を定義する必要があります。

  • 各複合グループの計算を定義する Python 関数。
  • 出力 PySpark のスキーマを定義する StructType オブジェクトまたは文字列 DataFrame.

返される pandas.DataFrame の列ラベルは、定義された出力スキーマのフィールド名と一致する (文字列として指定されている場合) か、文字列でない場合はフィールドデータ型 (整数インデックスなど) でフィールドデータ型と一致する必要があります。 パンダを参照してください。データフレーム は、pandas.DataFrameの作成時に列にラベルを付ける方法について説明します。

複合グループのすべてのデータは、関数が適用される前にメモリにロードされます。 これにより、特にグループ サイズが偏っている場合に、メモリ不足の例外が発生する可能性があります。 maxRecordsPerBatch の構成は適用されず、グループ化されたデータが使用可能なメモリに収まるようにする必要があります。

次の例は、 groupby().cogroup().applyInPandas() を使用して 2 つのデータセット間で asof join を実行する方法を示しています。

Python
import pandas as pd

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))

df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))

def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+

詳しい使い方は PySpark.sqlをpyspark.sql.PandasCogroupedOps.applyInPandas を参照してください。