メインコンテンツまでスキップ

Lakeflow Designer のユーザー定義演算子

備考

プレビュー

この機能は パブリック プレビュー段階です。

Lakeflow Designer を使用すると 、ユーザー定義の演算子 を作成し、それをキャンバス上で標準の演算子と並べて直接表示できます。 これらを使用して、 Lakeflow Designerに独自のビジネスロジック、計算、または統合機能を拡張できます。

ユーザー定義演算子には、次の3種類があります。

  • python-run-function : ワークスペースに保存された、インラインPythonを含むスタンドアロンのYAMLファイル。データフレームレベルの変換や外部システムとの連携に最適です。アクセス許可はワークスペースファイルレベルで管理されます。
  • uc-udf : Unity Catalogスカラー関数をラップします。 列レベルの変換に最適です。アクセス権限はUnity Catalog権限によって管理されます。
  • uc-udtf : Unity Catalogテーブル値関数をラップします。 MLクラスタリングや集計などのテーブルレベルの変換に最適です。 アクセス権限はUnity Catalog権限によって管理されます。

機能

python-run-function

uc-udf

uc-udtf

使用例

DataFrame変換、 API統合、電子メール通知

列レベルの計算(BMI、金利)

MLクラスタリング、行全体の集計

入力

データフレーム

単一の値

表全体、行ごと

出力

データフレーム

単一値

表(複数行)

Unity Catalog機能が必要です

No

はい

はい

アクセスガバナンス

ワークスペースのファイル権限

Unity Catalog権限( EXECUTEUSE SCHEMA

Unity Catalog権限( EXECUTEUSE SCHEMA

対応言語

Pythonのみ

SQLまたはSQLラッパー内のPython

SQLまたはSQLラッパー内のPython

ユーザー定義演算子はどのように機能しますか?

ユーザー定義演算子は以下で構成されます。

  • 演算子ロジック :演算子が実行されたときに実行されるコード。これは、インラインのPython run()関数 ( python-run-functionの場合) またはUnity Catalog関数 ( uc-udfおよびuc-udtfの場合) のいずれかです。
  • YAML 設定 : Lakeflow Designerに対して、オペレーターの名前、説明、入力の区切り、UI ウィジェット、ポートなど、オペレーターを UI でどのように表示するかを指示します。 すべての演算子タイプはuser-defined-operator-v0.1.0スキーマを使用します。
  • 登録ファイル :Lakeflow Designerがオペレーターを検出できるようにする.user_defined_operators.yamlのエントリ。

演算子ロジック

Pythonの実行関数のユーザー定義演算子ロジック

すべてのpython-run-function演算子はrun()関数を定義する必要があります。

Python
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config : UI からユーザーが設定した値。プロパティ名をキーとする。
  • inputs : 入力DataFrames 、入力ポートnameをキーとする。
  • spark : アクティブな SparkSession。
  • 戻り値 : 出力ポートname値をDataFramesにマッピングする辞書。

次の例は、入力DataFrameから行をフィルタリングします。

Python
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}

オペレーターが外部の pip パッケージを必要とする場合は、YAML にenvironmentフィールドを追加してください。

YAML
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0

UDFおよびUDTF演算子ロジック

UC関数はSQLまたはPythonで記述できます。Python関数はSQLのCREATE FUNCTIONステートメントで囲まれています。

SQL関数:

SQL
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);

Python関数(SQLでラップ):

SQL
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;

UDF(ユーザー定義関数)は一度に1つの値を処理し、計算された値を返します。UDTFはテーブルを1行ずつ処理し、すべての行にわたって状態を保持することができます。列レベルの変換にはuc-udf使用し、 MLクラスタリングや集計などの操作にはuc-udtf使用します。

さらに、UDTFでは、 __init__()eval()terminate()の3つのキーメソッドを定義する必要があります。

Python
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.

def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.

def terminate(self):
# Called after all rows - perform final calculations and yield results.
注記

UDTFの戻り値テーブルは、固定された明示的な型を持つ必要があります。戻り値の設定では、入力列の型を参照することはできません。

YAML設定

YAML設定は、Lakeflow DesignerがUI上でオペレーターをどのように表示するかを指示します。オペレーターの名前、説明、入力問題、UI ウィジェット、およびポートを定義します。 各設定フィールドは、タイプ、タイトル、およびオプションのウィジェットヒントx-uiを持つプロパティです。

YAML
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression

YAMLスキーマの詳細(すべてのウィジェットタイプと構成オプションを含む)については、 「ユーザー定義演算子YAMLリファレンス」を参照してください。

ポートは、オペレーターの入力と出力を定義します。

YAML
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data

Python実行関数演算子のYAML

python-run-function演算子の場合、YAMLファイルは独立しており、インラインPythonコードを含むrun_functionフィールドが含まれています。

YAML
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}

Unity Catalog機能用のYAML

UCベースの演算子の場合、YAML設定をコメントまたはドキュメンテーション文字列として関数内に埋め込んでください。

SQLでは/* ... */コメントを使用):

SQL
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);

Pythonの場合""" ... """ドキュメント文字列を使用):

SQL
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""

return weight_kg / (height_m ** 2)
$$;

オペレーターを登録してLakeflow Designerにデプロイします

オペレーターをLakeflow Designerに表示するには、それを.user_defined_operators.yamlファイルに登録します。

  • ワークスペース レベル: ファイルをワークスペースのルートに配置すると、オペレーターがすべてのユーザーに表示されます。
  • ユーザー レベル: オペレーターが自分だけに表示されるようにするには、ファイルをユーザーのホームアラーム ( /Workspace/Users/<user-name>/.user_defined_operators.yaml ) に配置します。

operators:セクションでは、ファイルパス、 Unity Catalog関数の参照、およびグロブパターンがサポートされます。 エントリーの種類は組み合わせることができます。

YAML
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function

高度な設定

プレビューモード

Lakeflow Designerは、デザインモード中にプレビュー機能をサポートしています。外部APIsを呼び出したり、外部システムに書き込んだりする演算子については、プレビュー中に副作用をスキップできるように、 is_preview設定プロパティを追加してください。 プレビューモードが有効になっている場合、ユーザーは副作用を伴う演算子を実行するために、明示的に 「実行」を クリックする必要があります。

YAML
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false

Lakeflow Designerはプレビュー時にこの値を自動的にtrueに設定します。副作用を回避するために、ロジック内で確認してください。

Python
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Unity Catalog接続

外部APIsを呼び出すUCベースのSQLオペレーターの場合、 Unity Catalog HTTP接続を使用して認証情報を安全に保存します。

SQL
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);

次に、SQL UDF 内でhttp_request()関数を使用して接続を使用します。詳細については、 「外部 HTTP サービスへの接続」を参照してください。

WorkspaceClient

python-run-functionオペレーターの場合、 Databricks WorkspaceClientを使用してワークスペース リソースと外部APIsアクセスできます。

Python
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources

完全なPython -実行-function ユーザー定義演算子を作成する

次のステップでは、 python-run-function演算子を最初から作成する手順を説明します。

ステップ 1: ロジックを定義する

run()関数をノートブックに記述してください。

Python
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}

ステップ 2: 機能をテストする

サンプルデータを使用して、関数を対話的にテストしてください。

Python
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)

result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)

result["out"].show()

ステップ 3: YAML 構成を作成する

オペレーターのメタデータ、設定フィールド、およびポートをYAMLファイルで定義します。

YAML
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name

ステップ 4: ロジックと YAML を結合する

完全なYAMLファイルを作成するには、 run_functionフィールドとportsフィールドを追加します。ワークスペースに保存してください。例: /Workspace/Users/<user-name>/udos/add_timestamp.yaml :

YAML
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}

ステップ 5: オペレーターを登録する

.user_defined_operators.yamlファイルにファイルパスを追加します。

YAML
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml

ステップ 6: Lakeflow Designerでオペレーターを使用する

Lakeflow Designerを開き、オペレーターパレットにオペレーターが表示されていることを確認してください。キャンバス上にドラッグし、入力を接続し、列名を設定して、プレビューを実行します。

完全なUCユーザー定義オペレーターを作成する

次のステップでは、UC ベースのuc-udf演算子の作成について説明します。

ステップ 1: ロジックを定義する

ノートブックに関数のロジックを記述してテストしてください。

Python
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2

ステップ 2: YAML 構成を作成する

オペレーターのメタデータ、設定フィールド、およびポートを定義します。

YAML
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output

ステップ 3: ロジックと YAML を結合する

YAMLをドキュメント文字列として埋め込んだUnity Catalog関数を作成します。

SQL
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""

def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2

return double_value(input_value)
$$

ステップ 4: 機能をテストする

SQL
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

ステップ 5: オペレーターを登録する

Unity Catalog関数の参照を.user_defined_operators.yamlファイルに追加します。

YAML
operators:
- catalog: main
schema: my_schema
functionName: double_value

ステップ 6: Lakeflow Designerでオペレーターを使用する

Lakeflow Designerを開き、オペレーターパレットにオペレーターが表示されていることを確認してください。キャンバス上にドラッグして、入力を接続し、プレビューを実行してください。

トラブルシューティング

問題

ソリューション

オペレーターがLakeflow Designer に表示されません。

.user_defined_operators.yamlが存在し、関数またはファイルパスがリストされていることを確認してください。python-run-function演算子については、ファイルパスとYAMLファイルへのアクセス可能性を確認してください。

スキーマ検証に失敗しました。

YAMLをhttps://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.jsonの公式スキーマと照合して確認してください。

アクセスが拒否されました。

UCベースのオペレーターの場合、ユーザーが関数に対してEXECUTE 、スキーマに対してUSE SCHEMAを持っていることを確認してください。python-run-function演算子については、ユーザーがYAMLファイルへの読み取りアクセス権を持っていることを確認してください。

python-run-function オペレーターが実行時に失敗します。

run()関数のシグネチャがdef run(config, inputs, spark)と一致することを確認してください。コード内のポート名がYAMLと一致していること、および戻り値の辞書キーが出力ポートname値と一致していることを確認してください。

UDTF は間違った型を返します。

UDTFの戻り値の型は明示的に指定する必要があります。入力列の型を参照することはできません。

権限

権限

目的

.user_defined_operators.yamlへの 読み取りアクセス

オペレーターを発見します。

YAMLファイルへの 読み取りアクセスpython-run-functionのみ)。

演算子定義を読み込みます。

Unity Catalog関数に対して EXECUTEを実行します (UCベースの演算子のみ)。

オペレーターを実行します。

スキーマに対して USE SCHEMAを実行します (UCベースの演算子のみ)。

関数が作成されたスキーマにアクセスします。

その他の権限

ご利用の通信事業者によっては、ユーザーにその他の権限が必要となる場合があります。例えば、HTTP API呼び出しの場合、 Unity Catalog接続ではUSE CONNECTIONなります。

次のステップ

以下のチュートリアルをご覧ください。

Type

説明

Gmail の電子メール送信者

python-run-function

DataFrameデータをCSV電子メールの添付ファイルとして Gmail 経由で送信します。

複利計算機

uc-udf

複利の公式を使用して将来の投資価値を計算します。

K平均法クラスタリング

uc-udtf

scikit-learnを使用してデータをクラスターに分割します。

Slackメッセージを送信

uc-udf

API経由で Slack チャンネルに通知を送信します。

すべてのUIウィジェット

uc-udf

利用可能なすべてのUIウィジェットを表示する参照演算子。

YAMLスキーマの完全なリファレンスについては、 「ユーザー定義演算子YAMLリファレンス」を参照してください。