メインコンテンツまでスキップ

Pandasユーザー定義関数

Pandasユーザー定義関数 (UDF) は、ベクトル化UDFとも呼ばれ、 Apache Arrowでデータを転送し、Pandasでデータを操作します。Pandas UDF は 一度に行数の多いPython UDFと比較してパフォーマンスを最大100 倍向上させることができるベクトル化オペレーションです。

背景情報については、ブログ記事を参照してください。 Apache Spark 3.0 の今後のリリースにおける新しい Pandas UDF と Python 型ヒント

キーワード pandas_udf をデコレータとして使用して pandas UDF を定義し、 Python 型ヒントで関数をラップします。 この記事では、さまざまな種類の pandas UDF について説明し、型ヒントと共に pandas UDF を使用する方法を示します。

シリーズ間 UDF

シリーズからシリーズへのpandas UDF を使用して、スカラー演算をベクトル化します。 selectwithColumnなどの API で使用できます。

Python関数は、入力としてPandasシリーズを受け取り、同じ長さのPandasシリーズを返す必要があります。これらの型は、Python 型ヒントを使用して指定します。Spark 実行 データを行のバッチに分割し、各バッチの関数を呼び出してから、結果を連結して Pandas UDF を実行します。

次の例は、2つの列の積を計算するpandas UDFの作成方法を示しています。

Python
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+

シリーズのイテレータからシリーズUDFのイテレータへ

イテレータ UDF は、次の点を除いてスカラー pandas UDF と同じです。

  • Python 関数

    • 単一の入力バッチの代わりにバッチのイテレータを入力として受け取ります。
    • 単一の出力バッチではなく、出力バッチのイテレータを返します。
  • イテレータの出力全体の長さは、入力全体の長さと同じである必要があります。

  • ラップされたpandas UDF は、1 つの Spark 列を入力として受け取ります。

Python の型ヒントを次のように指定する必要があります。 Iterator[pandas.Series] -> Iterator[pandas.Series].

このpandas UDF は、UDF の実行で何らかの状態を初期化する必要がある場合 (たとえば、機械学習モデル ファイルを読み込んですべての入力バッチに推論を適用するなど) に役立ちます。

次の例は、イテレータをサポートする pandas UDF を作成する方法を示しています。

Python
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y = 1 # value captured by the UDF closure

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+

複数のシリーズのイテレータからシリーズUDFのイテレータへ

複数のシリーズのイテレータからシリーズ UDF のイテレータへのイテレータは、同様の特性を持ち、 シリーズの反復子としての制限 UDF の反復子へのシリーズ。 指定された関数はバッチの反復子を受け取り、 バッチのイテレータを出力します。 また、UDF の実行に一部の初期化が必要な場合にも役立ちます 状態。

違いは次のとおりです。

  • 基礎となるPython関数は、pandasシリーズの タプル のイテレータを取ります。
  • ラップされたpandas UDF は、 複数の Spark 列を入力として受け取ります。

型ヒントは Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]として指定します。

Python
from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+

直列からスカラー UDF へ

直列からスカラー Pandas UDF は、 Spark 集計関数に似ています。Series から Pandas UDF への A は、1 つ以上の集計を定義します Pandas Series をスカラー値に設定します。各 Pandas Series は Spark 列を表します。SeriesPandasUDF APIselectを使用して、 、withColumngroupBy.agg PySpark、.sql。ウィンドウ

タイプ ヒントは pandas.Series, ... -> Anyで表します。 戻り値の型は primitive データ型で、返されるスカラーは Python プリミティブ型 (例: int または float または numpy.int64numpy.float64などの NumPy データ型。 Any 理想的には 特定のスカラー型であること。

このタイプの UDF は部分的な集計をサポートし ておらず 、各グループのすべてのデータがメモリにロードされます。

次の例は、このタイプの UDF をコンピュート平均にselectgroupBy、およびwindow演算で使用する方法を示しています。

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+

w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+

詳細な使用方法については、 PySpark.sql.functions.pandas_udf を参照してください。

使い方

Arrowのバッチサイズの設定

注記

この構成は、サーバレス コンピュート、または標準アクセスモードと Databricks Runtime 13.3 LTS ~ 14.2 で構成されたコンピュートには影響を与えません。サーバレス コンピュートでは、プラットフォームが Arrow バッチのサイズ設定を内部的に管理します。

Spark のデータ パーティションは Arrow レコード バッチに変換されます。 は、JVMのメモリ使用量が一時的に高くなる可能性があります。 可能性を避けるために メモリ不足の例外がある場合は、Arrow レコードバッチのサイズを調整できます spark.sql.execution.arrow.maxRecordsPerBatch 設定を次の整数に設定する 各バッチの最大行数を決定します。 デフォルト値 はバッチあたり 10,000 レコードです。 列数が多い場合、 それに応じて値を調整する必要があります。 この制限を使用すると、各データ パーティションは、処理のために 1 つ以上のレコード バッチに分割されます。

タイム ゾーン セマンティクスを使用したタイムスタンプ

Spark は、タイムスタンプを UTC 値として内部的に格納し、タイムスタンプ データを格納します タイムゾーンを指定せずに持ち込まれたものは、マイクロ秒の解像度でUTCまでのローカル時間として変換されます 。

タイムスタンプデータをSparkにエクスポートまたは表示する場合、 セッションのタイムゾーンは、 タイムスタンプ値。 セッションのタイムゾーンは、 spark.sql.session.timeZone 構成で、デフォルトはJVMシステムローカル時間帯です。 Pandasはナノ秒の解像度のdatetime64タイプと、datetime64[ns]、オプションでカラムごとベースでのタイムゾーンを使用しています。

タイムスタンプデータが Spark から Pandas に転送されると、次のようになります。 ナノ秒に変換され、各列は Sparkに変換されます セッションのタイムゾーンは、そのタイムゾーンにローカライズされ、 タイム ゾーンで、値を現地時間として表示します。 これは、タイムスタンプ列で toPandas() または pandas_udf を呼び出す際に発生します。

タイムスタンプ データがpandasから Spark に転送されると、UTC マイクロ秒に変換されます。 これは、pandas データフレーム を使用して createDataFrame を呼び出すとき、またはpandas UDF からタイムスタンプを返すときに発生します。 これらの変換は、Spark が予期される形式のデータを保持するために自動的に行われるため、これらの変換を自分で行う必要はありません。 ナノ秒の値は切り捨てられます。

標準の UDF は、タイムスタンプデータを Python 日時オブジェクトとして読み込みますが、これはpandasのタイムスタンプとは異なります。 最適なパフォーマンスを得るには、pandas UDF でタイムスタンプを操作するときに pandas 時系列機能を使用することをお勧めします。 詳細については、「 時系列/日付機能」を参照してください。

ノートブックの例

次のノートブックは、 Pandas UDF で達成できるパフォーマンスの向上を示しています。

Pandas UDF benchmark ノートブック

ノートブックを新しいタブで開く