Pandasユーザー定義関数
Pandasユーザー定義関数 (UDF) は、ベクトル化UDFとも呼ばれ、 Apache Arrowでデータを転送し、Pandasでデータを操作します。Pandas UDF は 一度に行数の多いPython UDFと比較してパフォーマンスを最大100 倍向上させることができるベクトル化オペレーションです。
背景情報については、ブログ記事を参照してください。 Apache Spark 3.0 の今後のリリースにおける新しい Pandas UDF と Python 型ヒント。
キーワード pandas_udf をデコレータとして使用して pandas UDF を定義し、 Python 型ヒントで関数をラップします。
この記事では、さまざまな種類の pandas UDF について説明し、型ヒントと共に pandas UDF を使用する方法を示します。
シリーズ間 UDF
シリーズからシリーズへのpandas UDF を使用して、スカラー演算をベクトル化します。
select や withColumnなどの API で使用できます。
Python関数は、入力としてPandasシリーズを受け取り、同じ長さのPandasシリーズを返す必要があります。これらの型は、Python 型ヒントを使用して指定します。Spark 実行 データを行のバッチに分割し、各バッチの関数を呼び出してから、結果を連結して Pandas UDF を実行します。
次の例は、2つの列の積を計算するpandas UDFの作成方法を示しています。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
シリーズのイテレータからシリーズUDFのイテレータへ
イテレータ UDF は、次の点を除いてスカラー pandas UDF と同じです。
-
Python 関数
- 単一の入力バッチの代わりにバッチのイテレータを入力として受け取ります。
- 単一の出力バッチではなく、出力バッチのイテレータを返します。
-
イテレータの出力全体の長さは、入力全体の長さと同じである必要があります。
-
ラップされたpandas UDF は、1 つの Spark 列を入力として受け取ります。
Python の型ヒントを次のように指定する必要があります。
Iterator[pandas.Series] -> Iterator[pandas.Series].
このpandas UDF は、UDF の実行で何らかの状態を初期化する必要がある場合 (たとえば、機械学習モデル ファイルを読み込んですべての入力バッチに推論を適用するなど) に役立ちます。
次の例は、イテレータをサポートする pandas UDF を作成する方法を示しています。
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y = 1 # value captured by the UDF closure
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
複数のシリーズのイテレータからシリーズUDFのイテレータへ
複数のシリーズのイテレータからシリーズ UDF のイテレータへのイテレータは、同様の特性を持ち、 シリーズの反復子としての制限 UDF の反復子へのシリーズ。 指定された関数はバッチの反復子を受け取り、 バッチのイテレータを出力します。 また、UDF の実行に一部の初期化が必要な場合にも役立ちます 状態。
違いは次のとおりです。
- 基礎となるPython関数は、pandasシリーズの タプル のイテレータを取ります。
- ラップされたpandas UDF は、 複数の Spark 列を入力として受け取ります。
型ヒントは Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]として指定します。
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
直列からスカラー UDF へ
直列からスカラー Pandas UDF は、 Spark 集計関数に似ています。Series から Pandas UDF への A は、1 つ以上の集計を定義します
Pandas Series をスカラー値に設定します。各 Pandas Series は Spark 列を表します。SeriesPandasUDF APIselectを使用して、 、withColumn 、groupBy.agg PySpark、.sql。ウィンドウ 。
タイプ ヒントは pandas.Series, ... -> Anyで表します。 戻り値の型は
primitive データ型で、返されるスカラーは Python プリミティブ型 (例:
int または float または numpy.int64 や numpy.float64などの NumPy データ型。 Any 理想的には
特定のスカラー型であること。
このタイプの UDF は部分的な集計をサポートし ておらず 、各グループのすべてのデータがメモリにロードされます。
次の例は、このタイプの UDF をコンピュート平均にselect、groupBy、およびwindow演算で使用する方法を示しています。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
詳細な使用方法については、 PySpark.sql.functions.pandas_udf を参照してください。
使い方
Arrowのバッチサイズの設定
この構成は、サーバレス コンピュート、または標準アクセスモードと Databricks Runtime 13.3 LTS ~ 14.2 で構成されたコンピュートには影響を与えません。サーバレス コンピュートでは、プラットフォームが Arrow バッチのサイズ設定を内部的に管理します。
Spark のデータ パーティションは Arrow レコード バッチに変換されます。
は、JVMのメモリ使用量が一時的に高くなる可能性があります。 可能性を避けるために
メモリ不足の例外がある場合は、Arrow レコードバッチのサイズを調整できます
spark.sql.execution.arrow.maxRecordsPerBatch 設定を次の整数に設定する
各バッチの最大行数を決定します。 デフォルト値
はバッチあたり 10,000 レコードです。 列数が多い場合、
それに応じて値を調整する必要があります。 この制限を使用すると、各データ
パーティションは、処理のために 1 つ以上のレコード バッチに分割されます。
タイム ゾーン セマンティクスを使用したタイムスタンプ
Spark は、タイムスタンプを UTC 値として内部的に格納し、タイムスタンプ データを格納します タイムゾーンを指定せずに持ち込まれたものは、マイクロ秒の解像度でUTCまでのローカル時間として変換されます 。
タイムスタンプデータをSparkにエクスポートまたは表示する場合、
セッションのタイムゾーンは、
タイムスタンプ値。 セッションのタイムゾーンは、
spark.sql.session.timeZone 構成で、デフォルトはJVMシステムローカル時間帯です。 Pandasはナノ秒の解像度のdatetime64タイプと、datetime64[ns]、オプションでカラムごとベースでのタイムゾーンを使用しています。
タイムスタンプデータが Spark から Pandas に転送されると、次のようになります。
ナノ秒に変換され、各列は Sparkに変換されます
セッションのタイムゾーンは、そのタイムゾーンにローカライズされ、
タイム ゾーンで、値を現地時間として表示します。 これは、タイムスタンプ列で toPandas() または pandas_udf を呼び出す際に発生します。
タイムスタンプ データがpandasから Spark に転送されると、UTC マイクロ秒に変換されます。 これは、pandas データフレーム を使用して createDataFrame を呼び出すとき、またはpandas UDF からタイムスタンプを返すときに発生します。 これらの変換は、Spark が予期される形式のデータを保持するために自動的に行われるため、これらの変換を自分で行う必要はありません。 ナノ秒の値は切り捨てられます。
標準の UDF は、タイムスタンプデータを Python 日時オブジェクトとして読み込みますが、これはpandasのタイムスタンプとは異なります。 最適なパフォーマンスを得るには、pandas UDF でタイムスタンプを操作するときに pandas 時系列機能を使用することをお勧めします。 詳細については、「 時系列/日付機能」を参照してください。
ノートブックの例
次のノートブックは、 Pandas UDF で達成できるパフォーマンスの向上を示しています。