メインコンテンツまでスキップ

Unity Catalog の Python ユーザー定義テーブル関数 (UDTF)

備考

プレビュー

登録する Python UDTFs in Unity Catalog は パブリック プレビューです。

スカラー値の代わりに完全なテーブルを返すUnity Catalogユーザー定義テーブル関数 (UDTF) 登録する関数。 各呼び出しから単一の結果値を返すスカラー関数とは異なり、UDTF は SQL ステートメントのFROM句で呼び出され、複数の行と列を返すことができます。

UDTF は、次の場合に特に役立ちます。

  • 配列または複雑なデータ構造を複数の行に変換する
  • 外部 APIs またはサービスを SQL ワークフローに統合する
  • カスタム・データ生成またはエンリッチメント・ロジックの実装
  • 行間でステートフル操作を必要とするデータの処理

各 UDTF 呼び出しは、0 個以上の引数を受け入れます。これらの引数は、スカラー式または入力テーブル全体を表すテーブル引数にすることができます。

UDTF は、次の 2 つの方法で登録できます。

  • Unity Catalog: 登録する UDTF を Unity Catalogの管理対象オブジェクトとして登録します。
  • Session-scoped: ローカル SparkSessionに登録する 、現行ノートブックまたはジョブに分離されます。 Python ユーザー定義テーブル関数 (UDTF) を参照してください。

必要条件

Unity Catalog Python UDTF は、次のコンピュート タイプでサポートされています。

  • 標準アクセスモードのクラシックコンピュート(Databricks Runtime 17.1以降)
  • SQLウェアハウス (サーバレス or プロ)

Unity Catalog で UDTF を作成する

SQL DDLを使用して、Unity Catalog に管理された UDTF を作成します。UDTF は、SQL ステートメントのFROM句を使用して呼び出されます。

SQL
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

Output
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+

Databricks は、出力行を生成する必須の eval メソッドを使用して、Python UDTF を Python クラスとして実装します。

テーブル引数

注記

TABLE 引数は、Databricks Runtime 17.2 以降でサポートされています。

UDTF はテーブル全体を入力引数として受け入れることができるため、複雑なステートフル変換と集計が可能になります。

eval()およびterminate()ライフサイクル メソッド

UDTF のテーブル引数は、次の関数を使用して各行を処理します。

  • eval(): 入力テーブルの各行に対して 1 回呼び出されます。これは主な処理方法であり、必須です。
  • terminate(): すべての行がeval()によって処理された後、各パーティションの最後に 1 回呼び出されます。このメソッドを使用して、最終的な集計結果を生成したり、クリーンアップ操作を実行したりします。このメソッドはオプションですが、集計、カウント、バッチ処理などのステートフル操作には不可欠です。

eval()terminate()メソッドの詳細については、 Apache Sparkドキュメント: Python UDTF を参照してください。

行アクセスパターン

eval() TABLE 引数から行をPySpark .sql.Rowオブジェクトとして受け取ります。 列名 ( row['id']row['name'] ) またはインデックス ( row[0]row[1] ) で値にアクセスできます。

  • スキーマの柔軟性 : スキーマ定義なしで TABLE 引数を宣言します (例: data TABLEt TABLE )。この関数は任意のテーブル構造を受け入れるため、コードでは必要な列が存在することを検証する必要があります。

「例: IP アドレスを CIDR ネットワーク ブロックと照合する」および「例: Databricks ビジョン エンドポイントを使用してイメージのキャプションをバッチ処理する」を参照してください。

環境隔離

注記

共有分離環境には、Databricks Runtime 17.2 以降が必要です。以前のバージョンでは、すべての Unity Catalog Python UDTF は厳密な分離モードで実行されます。

同じ所有者とセッションを持つUnity Catalog Python UDTF は、デフォルトによって分離環境を共有できます。これにより、起動する必要がある個別の環境の数が減り、パフォーマンスが向上し、メモリ使用量が削減されます。

厳密な分離

UDTF が常に独自の完全に分離された環境で実行されるようにするには、 STRICT ISOLATION 特性句を追加します。

ほとんどの UDTF は厳密な分離を必要としません。標準データ処理 UDTF は、デフォルトの共有分離環境の恩恵を受け、より少ないメモリ消費でより高速に実行されます。

次の STRICT ISOLATION 特性句を UDTF に追加します。

  • eval()exec()、または同様の関数を使用して、入力をコードとして実行します。
  • ローカル・ファイル・システムにファイルを書き込みます。
  • グローバル変数またはシステム状態を変更します。
  • 環境変数にアクセスまたは変更します。

次の UDTF の例では、カスタム環境変数を設定し、変数を読み戻し、変数を使用して一連の数値を乗算します。UDTF はプロセス環境を変更するため、 STRICT ISOLATIONで実行します。そうしないと、同じ環境内の他の UDF/UDTF の環境変数がリークまたはオーバーライドされ、誤った動作が発生する可能性があります。

SQL
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor

# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))

# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

関数が一貫した結果を生成する場合に DETERMINISTIC を設定します

同じ入力に対して同じ出力を生成する場合は、関数定義に DETERMINISTIC を追加します。これにより、クエリの最適化によりパフォーマンスが向上します。

もちろん、バッチUnity Catalog Python UDTFは、明示的に宣言されない限り、非決定的であるとみなされます。 非決定論的関数の例としては、ランダム値の生成、現在の時刻や日付へのアクセス、外部 API 呼び出しなどがあります。

CREATE FUNCTION (SQL および Python)を参照してください。

実例

次の例は、単純なデータ変換から複雑な外部統合まで、Unity Catalog Python UDTF の実際のユース ケースを示しています。

例: 再実装 explode

Spark には組み込みの explode 関数が用意されていますが、独自のバージョンを作成すると、1 つの入力を受け取り、複数の出力行を生成する基本的な UDTF パターンが実証されます。

SQL
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;

この関数をSQLクエリで直接使用します。

SQL
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
Output
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+

または、 LATERAL結合を使用して既存のテーブル データに適用します。

SQL
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

:REST APIを介したIPアドレスの地理位置情報

この例では、UDTF が外部APIs SQLワークフローに直接統合する方法を示します。 アナリストは、別のETLプロセスを必要とせずに、使い慣れたSQL構文を使用して、ラケットAPI呼び出しでエンリッチデータを取得できます。

SQL
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
注記

Python UDTF は、標準アクセス モードで設定されたサーバレス コンピュートまたはコンピュートを使用する場合、ポート 80、443、および 53 を介した TCP/UDP ネットワーク トラフィックを許可します。

この関数を使用して、地理情報で Web ログ データをエンリッチします。

SQL
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

このアプローチにより、 リアルタイム 地理分析は、前処理されたルックアップ テーブルや個別のデータパイプラインを必要とせずに行われます。 UDTF は HTTP リクエスト、 JSON 解析、およびエラー処理を処理し、標準の SQL クエリを通じて外部データソースにアクセスできるようにします。

例: IPアドレスとCIDRネットワークブロックの一致

この例では、複雑なSQLロジックを必要とする一般的なデータ エンジニアリング タスクである、 CIDRネットワーク ブロックに対する IP アドレスの照合を示します。

まず、IPv4 アドレスと IPv6 アドレスの両方を含むサンプル データを作成します。

SQL
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

次に、UDTFを定義して登録します。 Python クラス構造に注目してください:

  • t TABLEは、任意のスキーマの入力テーブルを受け入れます。 UDTF は、提供された列を処理するために自動的に適応します。この柔軟性により、関数シグネチャを変更することなく、異なるテーブル間で同じ関数を使用できます。ただし、互換性を確保するために、行のスキーマを慎重に確認する必要があります。
  • __init__メソッドは、大規模なネットワーク リストの読み込みなど、負荷の高い 1 回限りのセットアップに使用されます。この作業は、入力テーブルのパーティションごとに 1 回実行されます。
  • evalメソッドは各行を処理し、コアとなる一致ロジックを含みます。このメソッドは、入力パーティション内の各行に対して 1 回だけ実行され、各実行はそのパーティションのIpMatcher UDTF クラスの対応するインスタンスによって実行されます。
  • HANDLER句は、UDTF ロジックを実装する Python クラスの名前を指定します。
SQL
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))

def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

ip_cidr_matcherが Unity Catalog に登録されたので、 TABLE()構文を使用して SQL から直接呼び出します。

SQL
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
Output
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+

例: Databricks ビジョンエンドポイントを使用したバッチ画像キャプション作成

この例では、 Databricksビジョン モデルサービング エンドポイントを使用したバッチ画像キャプションを示します。 バッチ処理とパーティションベースの実行にterminate()使用する方法を紹介します。

  1. 公開画像の URL を含むテーブルを作成します。

    SQL
    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
  2. 画像キャプションを生成するための Unity Catalog Python UDTF を作成します。

    1. バッチ サイズ、Databricks API トークン、ビジョン モデル エンドポイント、ワークスペース URL などの構成を使用して UDTF を初期化します。
    2. evalメソッドでは、画像の URL をバッファに収集します。バッファがバッチ サイズに達すると、バッチ処理をトリガーします。これにより、画像ごとに個別に呼び出すのではなく、1 回の API 呼び出しで複数の画像がまとめて処理されるようになります。
    3. バッチ処理方法では、バッファリングされたすべての画像をダウンロードし、base64 としてエンコードして、Databricks VisionModel への単一の API 要求に送信します。モデルはすべての画像を同時に処理し、バッチ全体のキャプションを返します。
    4. terminateメソッドは、各パーティションの最後に 1 回だけ実行されます。終了メソッドでは、バッファー内に残っている画像を処理し、収集されたすべてのキャプションを結果として生成します。
注記

<workspace-url>実際の Databricks ワークスペース URL ( https://your-workspace.cloud.databricks.com ) に置き換えます。

SQL
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-3-7-sonnet"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []

def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()

def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)

def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()

import base64
import httpx
import requests

# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3

# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None

for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)

content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})

payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}

response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)

result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()

lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]

while len(captions) < len(batch_data):
captions.append(batch_response)

self.results.extend(captions[:len(batch_data)])
$$;

バッチ画像キャプション UDTF を使用するには、サンプル画像テーブルを使用して呼び出します。

注記

your_secret_scopeapi_tokenを実際のシークレットスコープとDatabricks APIのキー名に置き換えます。

SQL
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
Output
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

カテゴリごとに画像キャプションを生成することもできます。

SQL
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
Output
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+

例: MLモデル評価のためのROC曲線とAUC計算

この例では、scikit-learn を使用してバイナリ分類モデルを評価するための受信者動作特性 (ROC) 曲線と曲線下面積 (AUC) スコアを計算する方法を示します。

この例では、いくつかの重要なパターンが示されています。

  • 外部ライブラリの使用 : ROC曲線の計算にscikit-learnを統合
  • ステートフル集計 : メトリクスを計算する前に、すべての行にわたる予測を蓄積します。
  • terminate()メソッドの使用法 : データセット全体を処理し、すべての行が評価された後にのみ結果を生成します
  • エラー処理 : 入力テーブルに必要な列が存在するかどうかを検証します

UDTF は、 eval()メソッドを使用してすべての予測をメモリに蓄積し、次にコンピュートして、 terminate()メソッドで完全なROC曲線を生成します。 このパターンは、計算に完全なデータセットを必要とするメトリクスに役立ちます。

SQL
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score

self._true_labels = []
self._predicted_scores = []

def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")

true_label = row['y_true']
predicted_score = row['y_score']

label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))

def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)

auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;

予測を含むサンプルのバイナリ分類データを作成します。

SQL
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

ROC曲線と AUC を計算します。

SQL
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
Output
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+

制限

Unity Catalog Python UDTF には、次の制限が適用されます。

次のステップ