Lakeflow Designer のユーザー定義演算子
プレビュー
この機能は パブリック プレビュー段階です。
Lakeflow Designer を使用すると 、ユーザー定義の演算子 を作成し、それをキャンバス上で標準の演算子と並べて直接表示できます。 これらを使用して、 Lakeflow Designerに独自のビジネスロジック、計算、または統合機能を拡張できます。
ユーザー定義演算子には、次の3種類があります。
python-run-function: ワークスペースに保存された、インラインPythonを含むスタンドアロンのYAMLファイル。データフレームレベルの変換や外部システムとの連携に最適です。アクセス許可はワークスペースファイルレベルで管理されます。uc-udf: Unity Catalogスカラー関数をラップします。 列レベルの変換に最適です。アクセス権限はUnity Catalog権限によって管理されます。uc-udtf: Unity Catalogテーブル値関数をラップします。 MLクラスタリングや集計などのテーブルレベルの変換に最適です。 アクセス権限はUnity Catalog権限によって管理されます。
機能 |
|
|
|
|---|---|---|---|
使用例 | DataFrame変換、 API統合、電子メール通知 | 列レベルの計算(BMI、金利) | MLクラスタリング、行全体の集計 |
入力 | データフレーム | 単一の値 | 表全体、行ごと |
出力 | データフレーム | 単一値 | 表(複数行) |
Unity Catalog機能が必要です | No | はい | はい |
アクセスガバナンス | ワークスペースのファイル権限 | Unity Catalog権限( | Unity Catalog権限( |
対応言語 | Pythonのみ | SQLまたはSQLラッパー内のPython | SQLまたはSQLラッパー内のPython |
ユーザー定義演算子はどのように機能しますか?
ユーザー定義演算子は以下で構成されます。
- 演算子ロジック :演算子が実行されたときに実行されるコード。これは、インラインのPython
run()関数 (python-run-functionの場合) またはUnity Catalog関数 (uc-udfおよびuc-udtfの場合) のいずれかです。 - YAML 設定 : Lakeflow Designerに対して、オペレーターの名前、説明、入力の区切り、UI ウィジェット、ポートなど、オペレーターを UI でどのように表示するかを指示します。 すべての演算子タイプは
user-defined-operator-v0.1.0スキーマを使用します。 - 登録ファイル :Lakeflow Designerがオペレーターを検出できるようにする
.user_defined_operators.yamlのエントリ。
演算子ロジック
Pythonの実行関数のユーザー定義演算子ロジック
すべてのpython-run-function演算子はrun()関数を定義する必要があります。
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
config: UI からユーザーが設定した値。プロパティ名をキーとする。inputs: 入力DataFrames 、入力ポートnameをキーとする。spark: アクティブな SparkSession。- 戻り値 : 出力ポート
name値をDataFramesにマッピングする辞書。
次の例は、入力DataFrameから行をフィルタリングします。
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
オペレーターが外部の pip パッケージを必要とする場合は、YAML にenvironmentフィールドを追加してください。
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
UDFおよびUDTF演算子ロジック
UC関数はSQLまたはPythonで記述できます。Python関数はSQLのCREATE FUNCTIONステートメントで囲まれています。
SQL関数:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
Python関数(SQLでラップ):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
UDF(ユーザー定義関数)は一度に1つの値を処理し、計算された値を返します。UDTFはテーブルを1行ずつ処理し、すべての行にわたって状態を保持することができます。列レベルの変換にはuc-udf使用し、 MLクラスタリングや集計などの操作にはuc-udtf使用します。
さらに、UDTFでは、 __init__() 、 eval() 、 terminate()の3つのキーメソッドを定義する必要があります。
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
UDTFの戻り値テーブルは、固定された明示的な型を持つ必要があります。戻り値の設定では、入力列の型を参照することはできません。
YAML設定
YAML設定は、Lakeflow DesignerがUI上でオペレーターをどのように表示するかを指示します。オペレーターの名前、説明、入力問題、UI ウィジェット、およびポートを定義します。 各設定フィールドは、タイプ、タイトル、およびオプションのウィジェットヒントx-uiを持つプロパティです。
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
YAMLスキーマの詳細(すべてのウィジェットタイプと構成オプションを含む)については、 「ユーザー定義演算子YAMLリファレンス」を参照してください。
港
ポートは、オペレーターの入力と出力を定義します。
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
Python実行関数演算子のYAML
python-run-function演算子の場合、YAMLファイルは独立しており、インラインPythonコードを含むrun_functionフィールドが含まれています。
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Unity Catalog機能用のYAML
UCベースの演算子の場合、YAML設定をコメントまたはドキュメンテーション文字列として関数内に埋め込んでください。
SQLでは ( /* ... */コメントを使用):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
Pythonの場合 ( """ ... """ドキュメント文字列を使用):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
オペレーターを登録してLakeflow Designerにデプロイします
オペレーターをLakeflow Designerに表示するには、それを.user_defined_operators.yamlファイルに登録します。
- ワークスペース レベル: ファイルをワークスペースのルートに配置すると、オペレーターがすべてのユーザーに表示されます。
- ユーザー レベル: オペレーターが自分だけに表示されるようにするには、ファイルをユーザーのホームアラーム (
/Workspace/Users/<user-name>/.user_defined_operators.yaml) に配置します。
operators:セクションでは、ファイルパス、 Unity Catalog関数の参照、およびグロブパターンがサポートされます。 エントリーの種類は組み合わせることができます。
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
高度な設定
プレビューモード
Lakeflow Designerは、デザインモード中にプレビュー機能をサポートしています。外部APIsを呼び出したり、外部システムに書き込んだりする演算子については、プレビュー中に副作用をスキップできるように、 is_preview設定プロパティを追加してください。 プレビューモードが有効になっている場合、ユーザーは副作用を伴う演算子を実行するために、明示的に 「実行」を クリックする必要があります。
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Lakeflow Designerはプレビュー時にこの値を自動的にtrueに設定します。副作用を回避するために、ロジック内で確認してください。
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Unity Catalog接続
外部APIsを呼び出すUCベースのSQLオペレーターの場合、 Unity Catalog HTTP接続を使用して認証情報を安全に保存します。
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
次に、SQL UDF 内でhttp_request()関数を使用して接続を使用します。詳細については、 「外部 HTTP サービスへの接続」を参照してください。
WorkspaceClient
python-run-functionオペレーターの場合、 Databricks WorkspaceClientを使用してワークスペース リソースと外部APIsアクセスできます。
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
完全なPython -実行-function ユーザー定義演算子を作成する
次のステップでは、 python-run-function演算子を最初から作成する手順を説明します。
ステップ 1: ロジックを定義する
run()関数をノートブックに記述してください。
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
ステップ 2: 機能をテストする
サンプルデータを使用して、関数を対話的にテストしてください。
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
ステップ 3: YAML 構成を作成する
オペレーターのメタデータ、設定フィールド、およびポートをYAMLファイルで定義します。
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ステップ 4: ロジックと YAML を結合する
完全なYAMLファイルを作成するには、 run_functionフィールドとportsフィールドを追加します。ワークスペースに保存してください。例: /Workspace/Users/<user-name>/udos/add_timestamp.yaml :
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
ステップ 5: オペレーターを登録する
.user_defined_operators.yamlファイルにファイルパスを追加します。
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
ステップ 6: Lakeflow Designerでオペレーターを使用する
Lakeflow Designerを開き、オペレーターパレットにオペレーターが表示されていることを確認してください。キャンバス上にドラッグし、入力を接続し、列名を設定して、プレビューを実行します。
完全なUCユーザー定義オペレーターを作成する
次のステップでは、UC ベースのuc-udf演算子の作成について説明します。
ステップ 1: ロジックを定義する
ノートブックに関数のロジックを記述してテストしてください。
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
ステップ 2: YAML 構成を作成する
オペレーターのメタデータ、設定フィールド、およびポートを定義します。
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
ステップ 3: ロジックと YAML を結合する
YAMLをドキュメント文字列として埋め込んだUnity Catalog関数を作成します。
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
ステップ 4: 機能をテストする
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
ステップ 5: オペレーターを登録する
Unity Catalog関数の参照を.user_defined_operators.yamlファイルに追加します。
operators:
- catalog: main
schema: my_schema
functionName: double_value
ステップ 6: Lakeflow Designerでオペレーターを使用する
Lakeflow Designerを開き、オペレーターパレットにオペレーターが表示されていることを確認してください。キャンバス上にドラッグして、入力を接続し、プレビューを実行してください。
トラブルシューティング
問題 | ソリューション |
|---|---|
オペレーターがLakeflow Designer に表示されません。 |
|
スキーマ検証に失敗しました。 | YAMLを |
アクセスが拒否されました。 | UCベースのオペレーターの場合、ユーザーが関数に対して |
|
|
UDTF は間違った型を返します。 | UDTFの戻り値の型は明示的に指定する必要があります。入力列の型を参照することはできません。 |
権限
権限 | 目的 |
|---|---|
| オペレーターを発見します。 |
YAMLファイルへの 読み取りアクセス ( | 演算子定義を読み込みます。 |
Unity Catalog関数に対して EXECUTEを実行します (UCベースの演算子のみ)。 | オペレーターを実行します。 |
スキーマに対して USE SCHEMAを実行します (UCベースの演算子のみ)。 | 関数が作成されたスキーマにアクセスします。 |
その他の権限 | ご利用の通信事業者によっては、ユーザーにその他の権限が必要となる場合があります。例えば、HTTP API呼び出しの場合、 Unity Catalog接続では |
次のステップ
以下のチュートリアルをご覧ください。
例 | Type | 説明 |
|---|---|---|
| DataFrameデータをCSV電子メールの添付ファイルとして Gmail 経由で送信します。 | |
| 複利の公式を使用して将来の投資価値を計算します。 | |
| scikit-learnを使用してデータをクラスターに分割します。 | |
| API経由で Slack チャンネルに通知を送信します。 | |
| 利用可能なすべてのUIウィジェットを表示する参照演算子。 |
YAMLスキーマの完全なリファレンスについては、 「ユーザー定義演算子YAMLリファレンス」を参照してください。