Databricks Connect for Python のユーザー定義関数
この記事では、Databricks Runtime 13.3 以降の Databricks Connect について説明します。
Databricks Connect for Python では 、ユーザー定義関数 (UDF) がサポートされています。UDF を含む DataFrame 操作が実行されると、UDF は Databricks Connect によってシリアル化され、要求の一部としてサーバーに送信されます。
Databricks Connect for Scalaの UDF に関する情報については、「Databricks Connect for のユーザー定義関数」Scalaを参照してください。
ユーザー定義関数はシリアル化および逆シリアル化されるため、クライアントのPython バージョンは、Python Databricksコンピュートの バージョンと一致する必要があります。サポートされているバージョンについては、 バージョンサポートマトリックスを参照してください。
UDF の定義
Databricks Connect for Python で UDF を作成するには、次のサポートされている関数のいずれかを使用します。
-
PySpark ユーザー定義関数
-
PySpark ストリーミング関数
たとえば、次の Python は、列の値を 2 乗する単純な UDF を設定します。
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
UDFの依存関係を管理する
プレビュー
この機能はパブリック プレビュー Databricks ConnectPython段階であり、 16.4 以降ではDatabricks Runtime が必要であり、16.4 以降 で実行されているクラスターが必要です。この機能を使用するには、ワークスペースの Unity Catalog でプレビューの拡張 Python UDF を有効にします。
Databricks Connect では、UDF に必要な Python 依存関係の指定がサポートされています。これらの依存関係はDatabricksUDFのPython環境の一部としてコンピュートにインストールされます。
この機能により、ユーザーは、基本環境で提供されるパッケージに加えて、UDF が必要とする依存関係を指定できます。また、 基本環境で提供されているものとは異なるバージョンのパッケージをインストールするためにも使用できます。
依存関係は、次のソースからインストールできます。
-
PyPI パッケージ
- PyPI パッケージは、 PEP 508 に従って指定できます (たとえば、
dice、pyjokes<1、simplejson==3.19.*。
- PyPI パッケージは、 PEP 508 に従って指定できます (たとえば、
-
Unity Catalog ボリュームに格納されたファイル
- wheel パッケージ (
.whl) と gzip 圧縮された tar ファイル (.tar.gz) の両方がサポートされています。ユーザーにはREAD_FILEre:[UC] ボリューム内のファイルに対するアクセス許可が付与されている必要があります。 - Unity Catalog ボリュームからパッケージをインストールする場合、UDF を呼び出すには、ソース ボリュームに対する
READ VOLUMEアクセス許可が必要です。この権限をすべてのアカウントユーザーに付与すると、新しいユーザーに対して自動的にこれが有効になります。 - Unity Catalog ボリューム ファイルは、
dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whlやdbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gzなど、dbfs:<path>として指定する必要があります。
- wheel パッケージ (
UDF にカスタム依存関係を含めるには、 withDependenciesを使用して環境で指定し、その環境を使用して Spark セッションを作成します。依存関係は Databricks コンピュートにインストールされ、この Spark セッションを使用するすべての UDF で使用できます。
次のコードは、PyPI パッケージ dice を依存関係として宣言しています。
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
または、ボリューム内のホイールの依存関係を指定するには、次のようにします。
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks ノートブックとジョブでの動作
ノートブックとジョブでは、UDF 依存関係を REPL に直接インストールする必要があります。Databricks Connect は、指定されたすべての依存関係が既にインストールされていることを確認して REPL Python 環境を検証し、インストールされていない場合は例外をスローします。
ノートブック環境の検証は、PyPI と Unity Catalog の両方のボリューム依存関係に対して行われます。ボリュームの依存関係は、wheel ファイルの場合は PEP-427 以降、ソース配布ファイルの場合は PEP-241 以降の標準の Python パッケージ仕様に従ってパッケージ化する必要があります。Pythonパッケージング標準の詳細については、PyPA のドキュメントを参照してください。
制限
- Python wheel やソース配布物などのファイルをローカル開発マシンで直接依存関係として指定することはできません。まず、Unity Catalog ボリュームにアップロードする必要があります。
pyspark.sql.streaming.DataStreamWriter.foreachUDF 依存関係のサポートには、Databricks Connect for Python 18.0 以上と、Databricks Runtime 18.0 以上を実行しているクラスターが必要です。pyspark.sql.streaming.DataStreamWriter.foreachBatchUDF 依存関係のサポートには、Databricks Connect for Python 18.0 以上と、Databricks Runtime 18.0 以上を実行しているクラスターが必要です。この機能はサーバーレスではサポートされません。- UDF 依存関係は、ウィンドウ関数上の Pandas 集計 UDF ではサポートされていません。
例
次の例では、環境内の PyPI と volumes の依存関係を定義し、その環境でのセッションを作成してから、それらの依存関係を使用する UDF を定義して呼び出します。
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
UDF依存関係の自動管理
プレビュー
この機能はパブリック プレビュー段階であり、Databricks Connect for Python 18.1 以上、ローカル マシン上の Python 3.12、および Databricks Runtime 18.1 以上を実行しているクラスターが必要です。この機能を使用するには、ワークスペースの Unity Catalog でプレビューの Enhanced Python UDF を有効にします。
Databricks Connect withAutoDependencies() API を使用すると、UDF のインポート ステートメントで使用されるローカル モジュールとパブリック PyPI 依存関係を自動的に検出してアップロードできます。依存関係を手動で指定する必要がなくなります。
次のコードは、自動依存関係管理を有効にします。
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
withAutoDependencies()メソッドは次の点を受け入れます:
upload_local:Trueに設定すると、UDF にインポートされたローカル モジュールが自動的に検出され、パッケージ化されて、UDF サンドボックスにアップロードされます。use_index:Trueに設定すると、UDF で使用されるパブリックPyPI依存関係が自動的に検出され、 Databricksコンピュートにインストールされます。 検出プロセスでは、ローカル マシンにインストールされているパッケージを使用してバージョンを判別し、ローカル環境とリモート実行環境間の一貫性を確保します。
制限
- 動的インポート (例:
importlib.import_module("foo")) はサポートされていません。 - 名前空間パッケージ (たとえば、
azure.eventhubおよびgoogle.cloud.aiplatform) はサポートされていません。 - 直接 URL 参照を使用してインストールされた依存関係はサポートされていません。これには、ローカル ホイール ファイルからインストールされたものも含まれます。
- プライベート パッケージ インデックスからインストールされた依存関係はサポートされていません。この方法でインストールされたパッケージは、パブリック PyPI からインストールされたパッケージと区別できません。
- 依存関係の検出は Python シェルでは機能しません。Pythonスクリプト、IPython シェル、Jupyter ノートブックのみがサポートされています。
例
次の例は、ローカル モジュールと PyPI パッケージの両方を使用した自動依存関係管理を示しています。この例では、 simplejsonとdice ( pip install simplejson diceを使用) がインストールされている必要があります。
まず、ローカル ヘルパー モジュールを作成します。
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
次に、メイン スクリプトでこれらのモジュールをインポートし、UDF で使用します。
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
ロギング
検出された依存関係を出力するには、 SPARK_CONNECT_LOG_LEVEL環境変数をinfoまたはdebugに設定します。あるいは、Python ログ フレームワークを構成します。
import logging
logging.basicConfig(level=logging.INFO)
関連するログはdatabricks.connect.auto_dependenciesモジュールによって出力されます。例:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Python ベース環境
UDF は、クライアントではなく、 Databricks コンピュートで実行されます。 UDFが実行される基本 Python 環境は、コンピュート Databricks によって異なります。
クラスタリングの場合、基本Python PythonDatabricks Runtime環境は、クラスタリングで実行されている バージョンの 環境です。Pythonこの基本環境の バージョンとパッケージのリストは、Databricks Runtime リリースノート の 「システム環境 」セクションと「 インストール済みPython ライブラリ 」セクションにあります。
サーバレス コンピュートの場合、ベース Python 環境は、次の表に示すように サーバレス環境のバージョン に対応します。
Databricks Connect のバージョン | UDF サーバレス環境 |
|---|---|
17.0 から 17.3、Python 3.12 | |
16.4.1 から 17 未満、Python 3.12 | |
15.4.10 から 16 未満、Python 3.12 | |
15.4.10 から 16 未満、Python 3.11 | |
15.4.0 から 15.4.9 および 16.0 から 16.3 | 最新のサーバレス コンピュート. 安定したPython環境のためには、15.4.10 LTS以上または16.4.1 LTS以上に移行してください。 |