Unity Catalog の Python ユーザー定義テーブル関数 (UDTF)
プレビュー
登録する Python UDTFs in Unity Catalog は パブリック プレビューです。
スカラー値の代わりに完全なテーブルを返すUnity Catalogユーザー定義テーブル関数 (UDTF) 登録する関数。 各呼び出しから単一の結果値を返すスカラー関数とは異なり、UDTF は SQL ステートメントのFROM句で呼び出され、複数の行と列を返すことができます。
UDTF は、次の場合に特に役立ちます。
- 配列または複雑なデータ構造を複数の行に変換する
- 外部 APIs またはサービスを SQL ワークフローに統合する
- カスタム・データ生成またはエンリッチメント・ロジックの実装
- 行間でステートフル操作を必要とするデータの処理
各 UDTF 呼び出しは、0 個以上の引数を受け入れます。これらの引数は、スカラー式または入力テーブル全体を表すテーブル引数にすることができます。
UDTF は、次の 2 つの方法で登録できます。
- Unity Catalog: 登録する UDTF を Unity Catalogの管理対象オブジェクトとして登録します。
- Session-scoped: ローカル
SparkSessionに登録する 、現行ノートブックまたはジョブに分離されます。 Python ユーザー定義テーブル関数 (UDTF) を参照してください。
必要条件
Unity Catalog Python UDTF は、次のコンピュート タイプでサポートされています。
- 標準アクセスモードのクラシックコンピュート(Databricks Runtime 17.1以降)
- SQLウェアハウス (サーバレス or プロ)
Unity Catalog で UDTF を作成する
SQL DDLを使用して、Unity Catalog に管理された UDTF を作成します。UDTF は、SQL ステートメントのFROM句を使用して呼び出されます。
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Databricks は、出力行を生成する必須の eval メソッドを使用して、Python UDTF を Python クラスとして実装します。
テーブル引数
TABLE 引数は、Databricks Runtime 17.2 以降でサポートされています。
UDTF はテーブル全体を入力引数として受け入れることができるため、複雑なステートフル変換と集計が可能になります。
eval()およびterminate()ライフサイクル メソッド
UDTF のテーブル引数は、次の関数を使用して各行を処理します。
eval(): 入力テーブルの各行に対して 1 回呼び出されます。これは主な処理方法であり、必須です。terminate(): すべての行がeval()によって処理された後、各パーティションの最後に 1 回呼び出されます。このメソッドを使用して、最終的な集計結果を生成したり、クリーンアップ操作を実行したりします。このメソッドはオプションですが、集計、カウント、バッチ処理などのステートフル操作には不可欠です。
eval()とterminate()メソッドの詳細については、 Apache Sparkドキュメント: Python UDTF を参照してください。
行アクセスパターン
eval() TABLE 引数から行をPySpark .sql.Rowオブジェクトとして受け取ります。 列名 ( row['id'] 、 row['name'] ) またはインデックス ( row[0] 、 row[1] ) で値にアクセスできます。
- スキーマの柔軟性 : スキーマ定義なしで TABLE 引数を宣言します (例:
data TABLE、t TABLE)。この関数は任意のテーブル構造を受け入れるため、コードでは必要な列が存在することを検証する必要があります。
「例: IP アドレスを CIDR ネットワーク ブロックと照合する」および「例: Databricks ビジョン エンドポイントを使用してイメージのキャプションをバッチ処理する」を参照してください。
環境隔離
共有分離環境には、Databricks Runtime 17.2 以降が必要です。以前のバージョンでは、すべての Unity Catalog Python UDTF は厳密な分離モードで実行されます。
同じ所有者とセッションを持つUnity Catalog Python UDTF は、デフォルトによって分離環境を共有できます。これにより、起動する必要がある個別の環境の数が減り、パフォーマンスが向上し、メモリ使用量が削減されます。
厳密な分離
UDTF が常に独自の完全に分離された環境で実行されるようにするには、 STRICT ISOLATION 特性句を追加します。
ほとんどの UDTF は厳密な分離を必要としません。標準データ処理 UDTF は、デフォルトの共有分離環境の恩恵を受け、より少ないメモリ消費でより高速に実行されます。
次の STRICT ISOLATION 特性句を UDTF に追加します。
eval()、exec()、または同様の関数を使用して、入力をコードとして実行します。- ローカル・ファイル・システムにファイルを書き込みます。
- グローバル変数またはシステム状態を変更します。
- 環境変数にアクセスまたは変更します。
次の UDTF の例では、カスタム環境変数を設定し、変数を読み戻し、変数を使用して一連の数値を乗算します。UDTF はプロセス環境を変更するため、 STRICT ISOLATIONで実行します。そうしないと、同じ環境内の他の UDF/UDTF の環境変数がリークまたはオーバーライドされ、誤った動作が発生する可能性があります。
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
関数が一貫した結果を生成する場合に DETERMINISTIC を設定します
同じ入力に対して同じ出力を生成する場合は、関数定義に DETERMINISTIC を追加します。これにより、クエリの最適化によりパフォーマンスが向上します。
もちろん、バッチUnity Catalog Python UDTFは、明示的に宣言されない限り、非決定的であるとみなされます。 非決定論的関数の例としては、ランダム値の生成、現在の時刻や日付へのアクセス、外部 API 呼び出しなどがあります。
CREATE FUNCTION (SQL および Python)を参照してください。
実例
次の例は、単純なデータ変換から複雑な外部統合まで、Unity Catalog Python UDTF の実際のユース ケースを示しています。
例: 再実装 explode
Spark には組み込みの explode 関数が用意されていますが、独自のバージョンを作成すると、1 つの入力を受け取り、複数の出力行を生成する基本的な UDTF パターンが実証されます。
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
この関数をSQLクエリで直接使用します。
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
または、 LATERAL結合を使用して既存のテーブル データに適用します。
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
例:REST APIを介したIPアドレスの地理位置情報
この例では、UDTF が外部APIs SQLワークフローに直接統合する方法を示します。 アナリストは、別のETLプロセスを必要とせずに、使い慣れたSQL構文を使用して、ラケットAPI呼び出しでエンリッチデータを取得できます。
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Python UDTF は、標準アクセス モードで設定されたサーバレス コンピュートまたはコンピュートを使用する場合、ポート 80、443、および 53 を介した TCP/UDP ネットワーク トラフィックを許可します。
この関数を使用して、地理情報で Web ログ データをエンリッチします。
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
このアプローチにより、 リアルタイム 地理分析は、前処理されたルックアップ テーブルや個別のデータパイプラインを必要とせずに行われます。 UDTF は HTTP リクエスト、 JSON 解析、およびエラー処理を処理し、標準の SQL クエリを通じて外部データソースにアクセスできるようにします。
例: IPアドレスとCIDRネットワークブロックの一致
この例では、複雑なSQLロジックを必要とする一般的なデータ エンジニアリング タスクである、 CIDRネットワーク ブロックに対する IP アドレスの照合を示します。
まず、IPv4 アドレスと IPv6 アドレスの両方を含むサンプル データを作成します。
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
次に、UDTFを定義して登録します。 Python クラス構造に注目してください:
t TABLEは、任意のスキーマの入力テーブルを受け入れます。 UDTF は、提供された列を処理するために自動的に適応します。この柔軟性により、関数シグネチャを変更することなく、異なるテーブル間で同じ関数を使用できます。ただし、互換性を確保するために、行のスキーマを慎重に確認する必要があります。__init__メソッドは、大規模なネットワーク リストの読み込みなど、負荷の高い 1 回限りのセットアップに使用されます。この作業は、入力テーブルのパーティションごとに 1 回実行されます。evalメソッドは各行を処理し、コアとなる一致ロジックを含みます。このメソッドは、入力パーティション内の各行に対して 1 回だけ実行され、各実行はそのパーティションのIpMatcherUDTF クラスの対応するインスタンスによって実行されます。HANDLER句は、UDTF ロジックを実装する Python クラスの名前を指定します。
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
ip_cidr_matcherが Unity Catalog に登録されたので、 TABLE()構文を使用して SQL から直接呼び出します。
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
例: Databricks ビジョンエンドポイントを使用したバッチ画像キャプション作成
この例では、 Databricksビジョン モデルサービング エンドポイントを使用したバッチ画像キャプションを示します。 バッチ処理とパーティションベースの実行にterminate()使用する方法を紹介します。
-
公開画像の URL を含むテーブルを作成します。
SQLCREATE OR REPLACE TEMPORARY VIEW sample_images AS
VALUES
('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
images(image_url, category); -
画像キャプションを生成するための Unity Catalog Python UDTF を作成します。
- バッチ サイズ、Databricks API トークン、ビジョン モデル エンドポイント、ワークスペース URL などの構成を使用して UDTF を初期化します。
evalメソッドでは、画像の URL をバッファに収集します。バッファがバッチ サイズに達すると、バッチ処理をトリガーします。これにより、画像ごとに個別に呼び出すのではなく、1 回の API 呼び出しで複数の画像がまとめて処理されるようになります。- バッチ処理方法では、バッファリングされたすべての画像をダウンロードし、base64 としてエンコードして、Databricks VisionModel への単一の API 要求に送信します。モデルはすべての画像を同時に処理し、バッチ全体のキャプションを返します。
terminateメソッドは、各パーティションの最後に 1 回だけ実行されます。終了メソッドでは、バッファー内に残っている画像を処理し、収集されたすべてのキャプションを結果として生成します。
<workspace-url>実際の Databricks ワークスペース URL ( https://your-workspace.cloud.databricks.com ) に置き換えます。
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-3-7-sonnet"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
バッチ画像キャプション UDTF を使用するには、サンプル画像テーブルを使用して呼び出します。
your_secret_scopeとapi_tokenを実際のシークレットスコープとDatabricks APIのキー名に置き換えます。
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
カテゴリごとに画像キャプションを生成することもできます。
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
例: MLモデル評価のためのROC曲線とAUC計算
この例では、scikit-learn を使用してバイナリ分類モデルを評価するための受信者動作特性 (ROC) 曲線と曲線下面積 (AUC) スコアを計算する方法を示します。
この例では、いくつかの重要なパターンが示されています。
- 外部ライブラリの使用 : ROC曲線の計算にscikit-learnを統合
- ステートフル集計 : メトリクスを計算する前に、すべての行にわたる予測を蓄積します。
terminate()メソッドの使用法 : データセット全体を処理し、すべての行が評価された後にのみ結果を生成します- エラー処理 : 入力テーブルに必要な列が存在するかどうかを検証します
UDTF は、 eval()メソッドを使用してすべての予測をメモリに蓄積し、次にコンピュートして、 terminate()メソッドで完全なROC曲線を生成します。 このパターンは、計算に完全なデータセットを必要とするメトリクスに役立ちます。
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
予測を含むサンプルのバイナリ分類データを作成します。
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
ROC曲線と AUC を計算します。
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
制限
Unity Catalog Python UDTF には、次の制限が適用されます。
- ポリモーフィックテーブル関数はサポートされていません。
- Unity Catalog サービスの資格情報 はサポートされていません。
- カスタム依存関係 はサポートされていません。