メインコンテンツまでスキップ

パンダUDF

Pandasユーザー定義関数を作成します。

Pandas UDF は、データを転送するために Arrow を使用し、データを操作するためにPandasを使用してSparkによって実行されるユーザー定義関数であり、 Pandas操作を可能にします。 Pandas UDF は、 pandas_udfをデコレータとして、または関数をラップするために使用して定義され、追加の構成は必要ありません。Pandas UDF は、一般的に通常の PySpark 関数 API として動作します。

構文

Python
import pyspark.sql.functions as sf

# As a decorator
@sf.pandas_udf(returnType=<returnType>, functionType=<functionType>)
def function_name(col):
# function body
pass

# As a function wrapper
sf.pandas_udf(f=<function>, returnType=<returnType>, functionType=<functionType>)

パラメーター

パラメーター

Type

説明

f

function

オプション。ユーザー定義関数。スタンドアロン関数として使用される場合の Python 関数。

returnType

pyspark.sql.types.DataType または str

オプション。ユーザー定義関数の戻り値の型。値は、DataType オブジェクトまたは DDL 形式の型文字列のいずれかになります。

functionType

int

オプション。PandasUDFType の列挙値。デフォルト: SCALAR。この問題は互換性のために存在します。 Python 型ヒントの使用が推奨されます。

例 1 : シリーズからシリーズへ - 文字列を大文字に変換します。

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()
Output
+--------------+
|to_upper(name)|
+--------------+
| JOHN DOE|
+--------------+

例 2 : キーワード引数を使用して Series を Series に変換します。

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as sf

@pandas_udf(returnType=IntegerType())
def calc(a: pd.Series, b: pd.Series) -> pd.Series:
return a + 10 * b

spark.range(2).select(calc(b=sf.col("id") * 10, a=sf.col("id"))).show()
Output
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
| 0|
| 101|
+-----------------------------+

例 3 : シリーズの反復子からシリーズの反復子へ。

Python
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf

@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1

df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
df.select(plus_one(df.v)).show()
Output
+-----------+
|plus_one(v)|
+-----------+
| 2|
| 3|
| 4|
+-----------+

例 4 : シリーズからスカラーへ - グループ化された集計。

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(mean_udf(df['v'])).show()
Output
+---+-----------+
| id|mean_udf(v)|
+---+-----------+
| 1| 1.5|
| 2| 6.0|
+---+-----------+

例 5 : ウィンドウ関数を使用してシリーズをスカラーに変換します。

Python
import pandas as pd
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()
Output
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+

例 6 : シリーズからスカラーへの反復子 - メモリ効率の高いグループ化された集計。

Python
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def pandas_mean_iter(it: Iterator[pd.Series]) -> float:
sum_val = 0.0
cnt = 0
for v in it:
sum_val += v.sum()
cnt += len(v)
return sum_val / cnt

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(pandas_mean_iter(df['v'])).show()
Output
+---+-------------------+
| id|pandas_mean_iter(v)|
+---+-------------------+
| 1| 1.5|
| 2| 6.0|
+---+-------------------+