メインコンテンツまでスキップ

Databricks Connect for Python のユーザー定義関数

注記

この記事では、Databricks Runtime 13.1 以降の Databricks Connect について説明します。

Databricks Connect for Python では 、ユーザー定義関数 (UDF) がサポートされています。UDF を含む DataFrame 操作が実行されると、UDF は Databricks Connect によってシリアル化され、要求の一部としてサーバーに送信されます。

Databricks Connect for Scalaの UDF に関する情報については、「Databricks Connect for のユーザー定義関数」Scalaを参照してください。

注記

ユーザー定義関数はシリアル化および逆シリアル化されるため、クライアントのPython バージョンは、Python Databricksコンピュートの バージョンと一致する必要があります。サポートされているバージョンについては、 バージョンサポートマトリックスを参照してください。

UDF の定義

Databricks Connect for Python で UDF を作成するには、次のサポートされている関数のいずれかを使用します。

たとえば、次の Python は、列の値を 2 乗する単純な UDF を設定します。

Python
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

依存関係を持つ UDFs

備考

プレビュー

この機能はパブリック プレビュー Databricks ConnectPython段階であり、 16.4 以降ではDatabricks Runtime が必要であり、16.4 以降 で実行されているクラスターが必要です。この機能を使用するには、ワークスペースの Unity Catalog でプレビューの拡張 Python UDF を有効にします。

Databricks Connect では、UDF に必要な Python 依存関係の指定がサポートされています。これらの依存関係はDatabricksUDFのPython環境の一部としてコンピュートにインストールされます。

この機能により、ユーザーは、基本環境で提供されるパッケージに加えて、UDF が必要とする依存関係を指定できます。また、 基本環境で提供されているものとは異なるバージョンのパッケージをインストールするためにも使用できます。

依存関係は、次のソースからインストールできます。

  • PyPI パッケージ

    • PyPI パッケージは、 PEP 508 に従って指定できます (たとえば、 dicepyjokes<1simplejson==3.19.*
  • Unity Catalog ボリュームに格納されたファイル

    • wheel パッケージ (.whl) と gzip 圧縮された tar ファイル (.tar.gz) の両方がサポートされています。ユーザーには READ_FILE re:[UC] ボリューム内のファイルに対するアクセス許可が付与されている必要があります。
    • Unity Catalog ボリュームからパッケージをインストールする場合、UDF を呼び出すには、ソース ボリュームに対する READ VOLUME アクセス許可が必要です。この権限をすべてのアカウントユーザーに付与すると、新しいユーザーに対して自動的にこれが有効になります。
    • Unity Catalog ボリューム ファイルは、dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whldbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gzなど、dbfs:<path>として指定する必要があります。

UDF にカスタム依存関係を含めるには、 withDependenciesを使用して環境で指定し、その環境を使用して Spark セッションを作成します。依存関係は Databricks コンピュートにインストールされ、この Spark セッションを使用するすべての UDF で使用できます。

次のコードは、PyPI パッケージ dice を依存関係として宣言しています。

Python
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

または、ボリューム内のホイールの依存関係を指定するには、次のようにします。

Python
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Databricks ノートブックとジョブでの動作

ノートブックとジョブでは、UDF 依存関係を REPL に直接インストールする必要があります。Databricks Connect は、指定されたすべての依存関係が既にインストールされていることを確認して REPL Python 環境を検証し、インストールされていない場合は例外をスローします。

ノートブック環境の検証は、PyPI と Unity Catalog の両方のボリューム依存関係に対して行われます。ボリュームの依存関係は、wheel ファイルの場合は PEP-427 以降、ソース配布ファイルの場合は PEP-241 以降の標準の Python パッケージ仕様に従ってパッケージ化する必要があります。Pythonパッケージング標準の詳細については、PyPA のドキュメントを参照してください。

制限

  • Python wheel やソース配布物などのファイルをローカル開発マシンで直接依存関係として指定することはできません。まず、Unity Catalog ボリュームにアップロードする必要があります。
  • UDF 依存関係は、ウィンドウ関数上の Pandas 集計 UDF ではサポートされていません。

次の例では、環境内の PyPI と volumes の依存関係を定義し、その環境でのセッションを作成してから、それらの依存関係を使用する UDF を定義して呼び出します。

Python
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

次の例では、環境内の依存関係を定義し、その環境とのセッションを作成してから、バッチストリームでその依存関係を使用する関数を定義して呼び出します。

Python
import time
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("simplejson==3.19.3")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

def feb_func(batch_df, batch_id):
from simplejson import loads, dumps
assert loads(dumps(batch_id)) == batch_id
batch_df.collect()

df = spark.readStream.format("rate").load()
q = df.writeStream.foreachBatch(feb_func).start()
time.sleep(3)
q.stop()

Python ベース環境

UDF は、クライアントではなく、 Databricks コンピュートで実行されます。 UDF が実行される基本 Python 環境は、選択した Databricks コンピュートによって異なります。

基本Python PythonDatabricks Runtime環境は、クラスターで実行されている バージョンの 環境です。Pythonこの基本環境の バージョンとパッケージのリストは、Databricks Runtime リリースノート の 「システム環境 」セクションと「 インストール済みPython ライブラリ 」セクションにあります。