pandas function APIs
pandas function APIs を使用するとパンダインスタンスを取得して出力するPythonネイティブ関数をPySpark DataFrameに直接適用できます。 pandasのユーザー定義関数と同様に、関数 APIs Apache Arrow を使用してデータを転送し、パンダを使用してデータを操作することもできます。ただし、Python 型ヒントは pandas 関数 APIsでは省略可能です。
pandas function APIsには3つのタイプがあります。
グループ化されたマップ
地図
共グループ化マップ
パンダ関数は APIs パンダUDF実行が使用するのと同じ内部ロジックを利用します。 これらは、PyArrow、サポートされている SQL 型、構成などの特性を共有します。
詳細については、ブログ記事 Apache Spark 3.0 の今後のリリースの新しい Pandas UDF と Python 型ヒントを参照してください。
グループ化されたマップ
グループ化されたデータは、 groupBy().applyInPandas()
を使用して変換し、"分割-適用-結合" パターンを実装します。 分割-適用-結合は、次の 3 つのステップで構成されます。
DataFrame.groupBy
を使用してデータをグループに分割します。各グループに関数を適用します。 関数の入力と出力はどちらも
pandas.DataFrame
です。 入力データには、各グループのすべての行と列が含まれます。結果を新しい
DataFrame
に結合します。
groupBy().applyInPandas()
を使用するには、以下を定義する必要があります。
各グループの計算を定義する Python 関数
出力のスキーマを定義する
StructType
オブジェクトまたは文字列DataFrame
返される pandas.DataFrame
の列ラベルは、文字列として指定されている場合は定義された出力スキーマのフィールド名と一致するか、文字列でない場合は位置によってフィールドのデータ型と一致する必要があります (整数インデックスなど)。 pandas.DataFrame
の作成時に列にラベルを付ける方法に関しては。pandas データフレームを参照してください。
グループのすべてのデータは、関数が適用される前にメモリに読み込まれます。 これにより、特にグループ サイズが歪んでいる場合に、メモリ不足の例外が発生する可能性があります。 maxRecordsPerBatch の構成はグループには適用されず、グループ化されたデータが使用可能なメモリに収まるようにする必要があります。
次の例は、 groupby().apply()
を使用してグループ内の各値から平均を減算する方法を示しています。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
地図
pandas.DataFrame
の反復子を現在の PySpark DataFrame を表す別の pandas.DataFrame
反復子に変換し、結果を PySpark DataFrameとして返すために、 DataFrame.mapInPandas()
で pandas インスタンスのマップ操作を実行します。
基になる関数は、 pandas.DataFrame
のイテレータを受け取り、出力します。 シリーズからシリーズなどの一部のpandas UDFとは対照的に、任意の長さの出力を返すことができます。
次に、 mapInPandas()
の使用例を示します。
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
共グループ化マップ
pandas インスタンスでグループ化されたマップ操作の場合は、 DataFrame.groupby().cogroup().applyInPandas()
を使用して 2 つの PySpark DataFrame
を共通キーで再グループ化し、次に示すように各共同グループに Python 関数を適用します。
キーを共有する各 DataFrame のグループがグループ化されるようにデータをシャッフルします。
各コグループに関数を適用します。 関数の入力は 2 つの
pandas.DataFrame
です (キーを表すオプションのタプル付き)。 関数の出力はpandas.DataFrame
です。すべてのグループの
pandas.DataFrame
を新しい PySparkDataFrame
に結合します。
groupBy().cogroup().applyInPandas()
を使用するには、以下を定義する必要があります。
各コグループの計算を定義する Python 関数。
出力 PySpark
DataFrame
のスキーマを定義するStructType
オブジェクトまたは文字列。
返される pandas.DataFrame
の列ラベルは、文字列として指定されている場合は定義された出力スキーマのフィールド名と一致するか、文字列でない場合は位置によってフィールドのデータ型と一致する必要があります (整数インデックスなど)。 pandas.DataFrame
の作成時に列にラベルを付ける方法に関しては。pandas データフレームを参照してください。
コグループのすべてのデータは、関数が適用される前にメモリにロードされます。 これにより、特にグループ サイズが歪んでいる場合に、メモリ不足の例外が発生する可能性があります。 maxRecordsPerBatch の構成は適用されず、グループ化されたデータが使用可能なメモリに収まるようにする必要があります。
次の例は、 groupby().cogroup().applyInPandas()
を使用して 2 つのデータセット間で asof join
を実行する方法を示しています。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
詳しい使い方は PySpark.sqlを参照してください。PandasCogroupedOps.applyInPandas.