メインコンテンツまでスキップ

ユーザー定義関数Pandas

Pandasユーザー定義関数 (UDF) は、ベクトル化UDFとも呼ばれ、 Apache矢印] でデータを転送し、Pandasでデータを操作します。PandasUDF は 一度に行数の多いPython UDF と比較してパフォーマンスを最大 100 倍向上させることができるベクトル化操作 。

背景情報については、ブログ記事を参照してください。 Apache Spark 3.0 の今後のリリースにおける新しい Pandas UDF と Python 型ヒント

キーワード pandas_udf をデコレータとして使用して pandas UDF を定義し、 Python 型ヒントで関数をラップします。 この記事では、さまざまな種類の pandas UDF について説明し、型ヒントと共に pandas UDF を使用する方法を示します。

シリーズ間 UDF

シリーズからシリーズへのpandas UDF を使用して、スカラー演算をベクトル化します。 selectwithColumnなどの APIs で使用できます。

Python 関数は、pandas シリーズを入力として受け取り、同じ長さのpandas シリーズを返す必要があり、Python 型ヒントでこれらを指定する必要があります。 Spark は、列をバッチに分割し、各バッチの関数をデータのサブセットとして呼び出し、結果を連結することで pandas UDF を実行します。

次の例は、2つの列の積を計算するpandas UDFの作成方法を示しています。

Python
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+

シリーズのイテレータからシリーズUDFのイテレータへ

イテレータ UDF は、次の点を除いてスカラー pandas UDF と同じです。

  • Python 関数

    • 単一の入力バッチの代わりにバッチのイテレータを入力として受け取ります。
    • 単一の出力バッチではなく、出力バッチのイテレータを返します。
  • イテレータの出力全体の長さは、入力全体の長さと同じである必要があります。

  • ラップされたpandas UDF は、1 つの Spark 列を入力として受け取ります。

Python の型ヒントを次のように指定する必要があります。 Iterator[pandas.Series] -> Iterator[pandas.Series].

このpandas UDF は、UDF の実行で何らかの状態を初期化する必要がある場合 (たとえば、機械学習モデル ファイルを読み込んですべての入力バッチに推論を適用するなど) に役立ちます。

次の例は、イテレータをサポートする pandas UDF を作成する方法を示しています。

Python
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+

複数のシリーズのイテレータからシリーズUDFのイテレータへ

複数のシリーズのイテレータからシリーズ UDF のイテレータへのイテレータは、同様の特性を持ち、 シリーズの反復子としての制限 UDF の反復子へのシリーズ。 指定された関数はバッチの反復子を受け取り、 バッチのイテレータを出力します。 また、UDF の実行に一部の初期化が必要な場合にも役立ちます 状態。

違いは次のとおりです。

  • 基礎となるPython関数は、pandasシリーズの タプル のイテレータを取ります。
  • ラップされたpandas UDF は、 複数の Spark 列を入力として受け取ります。

型ヒントは Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]として指定します。

Python
from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+

直列からスカラー UDF へ

直列からスカラー Pandas UDF は、 Spark 集計関数に似ています。Series から Pandas UDF への A は、1 つ以上の集計を定義します Pandas Series をスカラー値に設定します。各 Pandas Series は Spark 列を表します。SeriesPandasUDF APIsselectを使用して、 、withColumngroupBy.agg PySpark、.sql。ウィンドウ

タイプ ヒントは pandas.Series, ... -> Anyで表します。 戻り値の型は primitive データ型で、返されるスカラーは Python プリミティブ型 (例: int または float または numpy.int64numpy.float64などの NumPy データ型。 Any 理想的には 特定のスカラー型であること。

このタイプの UDF は部分的な集計をサポートし ておらず 、各グループのすべてのデータがメモリにロードされます。

次の例は、このタイプの UDF をコンピュート平均にselectgroupBy、およびwindow演算で使用する方法を示しています。

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+

w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+

詳細な使用方法については、 PySpark.sql.functions.pandas_udf を参照してください。

使い

矢印のバッチサイズの設定

注記

この設定は、標準アクセス モードと Databricks Runtime 13.3 LTS から 14.2 で設定されたコンピュートには影響しません。

Spark のデータ パーティションは Arrow レコード バッチに変換されます。 は、JVMのメモリ使用量が一時的に高くなる可能性があります。 可能性を避けるために メモリ不足の例外がある場合は、Arrow レコードバッチのサイズを調整できます spark.sql.execution.arrow.maxRecordsPerBatch 設定を次の整数に設定する 各バッチの最大行数を決定します。 デフォルト値 はバッチあたり 10,000 レコードです。 列数が多い場合、 それに応じて値を調整する必要があります。 この制限を使用すると、各データ パーティションは、処理のために 1 つ以上のレコード バッチに分割されます。

タイム ゾーン セマンティクスを使用したタイムスタンプ

Spark は、タイムスタンプを UTC 値として内部的に格納し、タイムスタンプ データを格納します タイムゾーンを指定せずに持ち込まれたものは、ローカルとして変換されます マイクロ秒の解像度でUTCまでの時間。

タイムスタンプデータをSparkにエクスポートまたは表示する場合、 セッションのタイムゾーンは、 タイムスタンプ値。 セッションのタイムゾーンは、 spark.sql.session.timeZone 構成で、デフォルトはJVMシステムローカルです 時間帯。 Pandas はナノ秒のdatetime64タイプを使用しています 解像度、datetime64[ns]、列ごとにオプションのタイムゾーンを使用 基礎。

タイムスタンプデータが Spark から Pandas に転送されると、次のようになります ナノ秒に変換され、各列は Sparkに変換されます セッションのタイムゾーンは、そのタイムゾーンにローカライズされ、 タイム ゾーンで、値を現地時間として表示します。 これは、次の場合に発生します タイムスタンプ列を使用して toPandas() または pandas_udf を呼び出す。

タイムスタンプ データがpandasから Spark に転送されると、UTC マイクロ秒に変換されます。 これは、pandas データフレーム を使用して createDataFrame を呼び出すとき、またはpandas UDF からタイムスタンプを返すときに発生します。 これらの変換は、Spark が予期される形式のデータを保持するために自動的に行われるため、これらの変換を自分で行う必要はありません。 ナノ秒の値は切り捨てられます。

標準の UDF は、タイムスタンプデータを Python 日時オブジェクトとして読み込みますが、これはpandasのタイムスタンプとは異なります。 最適なパフォーマンスを得るには、pandas UDF でタイムスタンプを操作するときに pandas 時系列機能を使用することをお勧めします。 詳細については、「 時系列/日付機能」を参照してください。

ノートブックの例

次のノートブックは、 Pandas UDF で達成できるパフォーマンスの向上を示しています。

Pandas UDF benchmark ノートブック

Open notebook in new tab