メインコンテンツまでスキップ

Unity Catalog の Python ユーザー定義テーブル関数 (UDTF)

備考

プレビュー

登録する Python UDTFs in Unity Catalog は パブリック プレビューです。

Unity Catalogユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりに完全なテーブルを返す関数を登録できます。各呼び出しから 1 つの結果値を返すスカラー関数とは異なり、UDTF は SQL ステートメントの FROM 句で呼び出され、複数の行と列を返すことができます。

UDTF は、次の場合に特に役立ちます。

  • 配列または複雑なデータ構造を複数の行に変換する
  • 外部 APIs またはサービスを SQL ワークフローに統合する
  • カスタム・データ生成またはエンリッチメント・ロジックの実装
  • 行間でステートフル操作を必要とするデータの処理

各 UDTF 呼び出しは、0 個以上の引数を受け入れることができます。これらの引数は、スカラー式または入力テーブル全体を表すテーブル引数にすることができます。

UDTF は、次の 2 つの方法で登録できます。

  • Unity Catalog: 登録する UDTF を Unity Catalogの管理対象オブジェクトとして登録します。
  • Session-scoped: ローカル SparkSessionに登録する 、現行ノートブックまたはジョブに分離されます。 Python ユーザー定義テーブル関数 (UDTF) を参照してください。

必要条件

Unity Catalog Python UDTF は、次のコンピュート タイプでサポートされています。

  • 標準アクセスモードのクラシックコンピュート(Databricks Runtime 17.1以降)
  • SQLウェアハウス (サーバレス、Pro、Classic)

Unity Catalog で UDTF を作成する

SQL DDL を使用して、Unity Catalog で管理された UDTF を作成します。UDTF は、SQL ステートメントの FROM 句を使用して呼び出されます。

SQL
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

Output
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+

Databricks は、出力行を生成する必須の eval メソッドを使用して、Python UDTF を Python クラスとして実装します。

実例

次の例は、単純なデータ変換から複雑な外部統合まで、Unity Catalog Python UDTF の実際のユース ケースを示しています。

例: 再実装 explode

Spark には組み込みの explode 関数が用意されていますが、独自のバージョンを作成すると、1 つの入力を受け取り、複数の出力行を生成する基本的な UDTF パターンが実証されます。

SQL
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;

この関数をSQLクエリで直接使用します。

SQL
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
Output
+---------+
|| element |
+---------+
|| apple |
|| banana |
|| cherry |
+---------+

または、 LATERAL 結合を使用して既存のテーブルデータに適用します。

SQL
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

:REST APIを介したIPアドレスの地理位置情報

この例では、UDTF が外部 APIs を SQL ワークフローに直接統合する方法を示します。 個別のETLプロセスを必要とする代わりに、アナリストは使い慣れたSQL構文を使用してリアルタイムAPI呼び出しでエンリッチデータを行うことができます。

SQL
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Returns nothing if the ip_address is invalid
return
$$;
注記

Python UDTF では、標準アクセス モードで構成されたサーバレス コンピュートまたはコンピュートを使用して、ポート 80、443、および 53 を介した TCP/UDP ネットワーク トラフィックを許可します。

この関数を使用して、地理情報で Web ログ データをエンリッチします。

SQL
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

このアプローチにより、 リアルタイム 地理分析は、前処理されたルックアップ テーブルや個別のデータパイプラインを必要とせずに行われます。 UDTF は HTTP リクエスト、 JSON 解析、およびエラー処理を処理し、標準の SQL クエリを通じて外部データソースにアクセスできるようにします。

関数が一貫した結果を生成する場合に DETERMINISTIC を設定します

同じ入力に対して同じ出力を生成する場合は、関数定義に DETERMINISTIC を追加します。これにより、クエリの最適化によりパフォーマンスが向上します。

デフォルトでは、バッチ Unity Catalog Python UDTF は、明示的に宣言されない限り、非定数であると想定されます。 非決定論的関数の例としては、ランダムな値の生成、現在の時刻または日付へのアクセス、外部 API 呼び出しの実行などがあります。

「CREATE FUNCTION (SQL および Python)」を参照してください。

制限

Unity Catalog Python UDTF には、次の制限が適用されます。

次のステップ