ユーザー定義演算子 YAML リファレンス
プレビュー
この機能は パブリック プレビュー段階です。
このページでは、Lakeflow Designerにおけるユーザー定義演算子のYAML設定について説明します。すべての演算子タイプ( uc-udf 、 uc-udtf 、およびpython-run-function )は、JSONスキーマ形式を使用して構成フィールドを定義するuser-defined-operator-v0.1.0スキーマを使用します。
ユーザー定義演算子の構築方法については、 Lakeflow Designer のユーザー定義演算子」を参照してください。
ルートプロパティ
すべての演算子YAMLファイルは、演算子を識別し、その動作を定義する一連のルートプロパティから始まります。以下の例は、一般的な構造を示しています。
schema: user-defined-operator-v0.1.0
type: python-run-function
name: My Operator
id: my_operator
version: '1.0.0'
description: >
What this operator does.
Can be multiple lines.
config:
type: object
properties:
my_field:
type: string
title: My Field
description: Help text
ports:
input:
- name: data
title: Input Data
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
return {"out": inputs["data"]}
environment:
environment_version: '4'
dependencies:
- 'pandas>=2.0'
属性 | Type | 必須 | 説明 |
|---|---|---|---|
| string | はい | スキーマ識別子。 |
| string | はい | 演算子の種類: |
| string | はい | オペレーターの表示名。Lakeflow DesignerのUIに収まるように、短くしてください。最低文字数は1文字です。 |
| string | はい | 演算子タイプの一意の識別子。最低文字数は1文字です。演算子を分類するために、名前空間( |
| string | はい | オペレーターの業務内容の詳細な説明。ユーザーインターフェース上でユーザーに表示されます。より長い説明には、YAMLの複数行構文( |
| オブジェクト | はい | 設定フィールドを定義するJSONスキーマオブジェクト。設定を参照してください。 |
| オブジェクト | No | 入力ポートと出力ポートの定義。ポートを参照してください。 |
| string | はい | バージョン文字列(例: |
| オブジェクト | No |
|
| オブジェクト | No | Python環境の設定(依存関係を含む)。 |
ポート
ポートは、オペレーターがパイプライン内の他のオペレーターとどのように接続するかを定義します。portsオブジェクトにはinputとoutput配列が含まれています。
ports:
input:
- name: input_data
title: Input Data
mime: application/vnd.databricks.dataframe
allowMultiple: true
required: true
output:
- name: out
title: Output
属性 | Type | 必須 | 説明 |
|---|---|---|---|
| string | はい | ポートの一意の識別子。接続および設定参照で使用されます。 |
| string | No | UI に表示される人間が読めるラベル。 |
| string | No | ポートデータのMIMEタイプ。例えば、 |
| boolean | No |
|
| boolean | No |
|
文書化された港湾物件のみが認められます。スキーマ検証では、未知のキー(従来のlabelフィールドなど)は拒否されます。
ポートの例
入力ポートと出力ポートを備えたUDF:
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
入力ポートと出力ポートを持つUDTF:
ports:
input:
- name: input_data
title: Input Data
output:
- name: clustered_data
title: Clustered Results
複数の入力とオプションのポートを備えたPython -実行-関数:
ports:
input:
- name: main_data
title: Main Data
- name: reference_data
title: Reference Table
required: false
output:
- name: joined_output
title: Joined Output
設定
configフィールドは JSON スキーマオブジェクトです。各設定フィールドは、スキーマ内のプロパティとして定義します。このフォーマットでは、 enum 、 minimum 、 maximum 、 examplesなどの標準的なJSONスキーマ検証機能にアクセスできます。
configオブジェクトはtype: objectとpropertiesマップを持つ必要があります。オプションでrequired (必須プロパティ名の配列) とadditionalPropertiesを含めることができます。
config:
type: object
properties:
cluster_count:
type: number
title: Number of Clusters
description: How many clusters to create
default: 3
minimum: 1
maximum: 100
algorithm:
type: string
title: Algorithm
description: Clustering algorithm to use
enum: ['kmeans', 'dbscan', 'hierarchical']
default: kmeans
feature_col:
type: string
title: Feature Column
description: Column to use as input
format: expression
x-ui:
widget: expression
port: data
required: [cluster_count, feature_col]
additionalProperties: false
設定プロパティフィールド
config.propertiesオブジェクトの各プロパティは、以下の標準 JSON スキーマフィールドをサポートしています。
フィールド | Type | 説明 |
|---|---|---|
| string | データ型: |
| string | UI に表示される人間が読めるラベル。 |
| string | ユーザーに表示されるヘルプテキスト。 |
| すべて | フィールドのデフォルト値。 |
| array | このフィールドのサンプル値。 |
| array | 許可される値のリストを固定しました。 |
| string | 意味的な型ヒント。値の書式設定を参照してください。 |
| 数字 | 最小許容値( |
| 数字 | 最大許容値( |
| オブジェクト | 配列要素のスキーマ( |
| オブジェクト | ネストされたプロパティ定義( |
| array | 必要なネストされたプロパティ名のリスト( |
minLength 、 maxLength 、 pattern 、 constなどの他の標準的なJSONスキーマフィールドもサポートされています。
値の書式設定
config プロパティのformatフィールドは、Lakeflow Designer にその値の解釈方法を指示する意味的な型ヒントを提供します。これらのヒントにより、特殊なUI動作と検証が可能になります。
フォーマット | 説明 |
|---|---|
| 列参照またはSQL式。 |
| テーブルソース参照。 |
| ファイルソース参照。 |
| 列式。 |
| 式をソートします。 |
| 集計式。 |
| AI関数表現。 |
| 自動プレビューモードフラグ。Lakeflow Designerは、ワークフローのプレビュー中にこれを |
| 文字列配列。 |
UIウィジェット
ウィジェットを使用すると、Lakeflow Designerインターフェースで設定フィールドがどのように表示されるかをカスタマイズできます。各設定プロパティのx-uiプロパティでウィジェットを定義します。ウィジェットを省略した場合、Lakeflow Designerはデータ型に基づいてデフォルトのウィジェットを使用します。
ウィジェット | データ型 | 説明 |
|---|---|---|
| string | 単一行のテキスト入力。 |
| string | 複数行テキストエリア。オプションの |
| boolean | 標準チェックボックス。 |
| boolean | 切り替えスイッチ。 |
| 数値/整数 | オプションの制約による数値入力。 |
| 数値/整数 | 数値範囲を表示するビジュアルスライダー。オプションの |
| string | 単一選択ドロップダウン。必要条件: |
| array | 複数選択可能なドロップダウンメニュー。必要条件: |
| string | 列/式セレクタ。必要条件: |
input
1 行のテキスト入力フィールド。
api_endpoint:
type: string
title: API Endpoint
x-ui:
widget: input
textarea
長文用の複数行テキストエリア。高さを制御するためのオプションのプロパティrowsをサポートします。
message_body:
type: string
title: Message Body
x-ui:
widget: textarea
rows: 4
checkbox
ブール値用の標準チェックボックス。
send_notification:
type: boolean
title: Send Notification
default: false
x-ui:
widget: checkbox
toggle
ブール値用の切り替えスイッチ。
enable_logging:
type: boolean
title: Enable Logging
default: true
x-ui:
widget: toggle
number
数値入力フィールド。プロパティ自体にminimumとmaximumを使用して範囲を制限します。
num_clusters:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
slider
範囲内の数値を選択するための視覚的なスライダー。プロパティでminimumとmaximum使用して範囲を設定し、 x-uiのstep使用して増分を制御します。
confidence_threshold:
type: number
title: Confidence Threshold
default: 0.8
minimum: 0
maximum: 1
x-ui:
widget: slider
step: 0.05
select
単一選択ドロップダウン。ドロップダウンリストの値の取得元を定義するには、 optionsSourceを指定する必要があります。オプションソースを参照してください。
aggregation_type:
type: string
title: Aggregation Type
x-ui:
widget: select
optionsSource:
type: static
values: ['sum', 'avg', 'min', 'max', 'count']
multi-select
複数の値を選択するための複数選択ドロップダウンリスト。プロパティにはtype: arrayとitems: { type: string }を組み合わせて使用してください。optionsSourceが必要です。オプションソースを参照してください。
feature_columns:
type: array
title: Feature Columns
items:
type: string
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
expression
ユーザーが入力データから列を選択したり、カスタムSQL式を記述したりできる列/式セレクター。プロパティにformat: expressionを設定し、 x-uiに入力portを指定します。これは便利です。
- ユーザーが入力データから列を選択する必要がある場合。
- ユーザーがカスタム SQL 式を記述する必要がある場合。
- パイプライン内の動的データを参照する問題について。
amount:
type: string
title: Amount
format: expression
x-ui:
widget: expression
port: input_data
オプションソース
selectおよびmulti-selectウィジェットの場合、 optionsSourceを使用してドロップダウン オプションの取得元を定義する必要があります。
静的オプション
YAML で定義された値の固定リスト。
optionsSource:
type: static
values: ['option1', 'option2', 'option3']
属性 | Type | 必須 | 説明 |
|---|---|---|---|
| string | はい |
|
| array | はい | ドロップダウンリストに使用する文字列値の配列。 |
入力列
入力ポートからの列名をドロップダウンに動的に入力します。
optionsSource:
type: inputColumns
port: input_data
属性 | Type | 必須 | 説明 |
|---|---|---|---|
| string | はい |
|
| string | はい | 列名を取得する入力ポートの名前。定義済みの入力ポートのいずれかの |
run_function
run_functionプロパティを使用すると、 python-run-function演算子の YAML 設定に Python コードを直接埋め込むことができます。これにより、別途Unity Catalog機能を登録する必要がなくなりました。
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["data"]
threshold = config["threshold"]
return {"out": df.filter(df["score"] > threshold)}
属性 | Type | 必須 | 説明 |
|---|---|---|---|
| string | はい |
|
| string | はい | Pythonのソースコード。 |
run()関数は3つの引数を受け取ります。
config: UIでユーザーが設定した設定値の辞書。inputs: 入力ポート名をDataFramesにマッピングする辞書。spark: アクティブな SparkSession。
この関数は、出力ポート名をDataFramesにマッピングする辞書を返す必要があります。 キーは、 ports.outputで定義されている各出力ポートのnameフィールドと完全に一致する必要があります。例えば、出力ポート名がoutの場合:
return {"out": result_df}
複数の出力ポートを備えています。
return {"match": match_df, "rest": rest_df}
environment
environmentプロパティは、 python-run-function演算子の Python 環境を指定します。これを使用して環境バージョンをピン留めし、pip の依存関係を宣言します。
environment:
environment_version: '4'
dependencies:
- 'scikit-learn>=1.3'
- 'pandas>=2.0'
属性 | Type | 必須 | 説明 |
|---|---|---|---|
| string | No | 使用する環境バージョン。例えば、 |
| 文字列の配列 | No | pipの依存関係指定子の一覧。各エントリは標準的な pip 構文に従います (例: |
完全な例
UCベースのUDF
この例では、複利を計算するUnityカタログベースのUDF演算子を定義します。
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Compound Interest
id: finance.compound_interest
version: '1.0.0'
description: >
Calculates compound interest based on principal, rate, and time period.
config:
type: object
properties:
principal:
type: string
title: Principal Amount
format: expression
x-ui:
widget: expression
port: input_data
annual_rate:
type: number
title: Annual Interest Rate
default: 5.0
minimum: 0
maximum: 100
x-ui:
widget: number
years:
type: number
title: Number of Years
default: 10
minimum: 1
maximum: 50
x-ui:
widget: slider
step: 1
compound_frequency:
type: string
title: Compounding Frequency
default: 'monthly'
x-ui:
widget: select
optionsSource:
type: static
values: ['daily', 'monthly', 'quarterly', 'annually']
required: [principal, annual_rate]
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: out
title: Output
Python 関数実行演算子
この例では、K平均法クラスタリングを使用して顧客をセグメント化するpython-run-function演算子を定義します。
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Customer Segmentation
id: ml.customer_segmentation
version: '1.2.0'
description: >
Segments customers into groups based on selected features
using K-Means clustering. Returns customer IDs with their
assigned segment numbers.
config:
type: object
properties:
num_segments:
type: integer
title: Number of Segments
description: How many customer segments to create
default: 3
minimum: 2
maximum: 20
x-ui:
widget: number
customer_id_column:
type: string
title: Customer ID Column
description: Column containing customer identifiers
x-ui:
widget: select
optionsSource:
type: inputColumns
port: customer_data
feature_columns:
type: array
title: Feature Columns
description: Columns to use for segmentation
items:
type: string
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: customer_data
normalize_features:
type: boolean
title: Normalize Features
description: Whether to normalize feature values before clustering
default: true
x-ui:
widget: toggle
required: [num_segments, customer_id_column, feature_columns]
additionalProperties: false
ports:
input:
- name: customer_data
title: Customer Data
mime: application/vnd.databricks.dataframe
output:
- name: segmented_customers
title: Segmented Customers
run_function:
type: inline
code: |
def run(config, inputs, spark):
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
df = inputs["customer_data"]
id_col = config["customer_id_column"]
features = config["feature_columns"]
k = config["num_segments"]
normalize = config.get("normalize_features", True)
assembler = VectorAssembler(inputCols=features, outputCol="features_vec")
assembled = assembler.transform(df)
if normalize:
scaler = StandardScaler(inputCol="features_vec", outputCol="scaled_features")
model = scaler.fit(assembled)
assembled = model.transform(assembled)
feature_col = "scaled_features"
else:
feature_col = "features_vec"
kmeans = KMeans(k=k, featuresCol=feature_col, predictionCol="segment")
result = kmeans.fit(assembled).transform(assembled)
return {"segmented_customers": result.select(id_col, "segment")}
environment:
environment_version: '4'
dependencies:
- 'scikit-learn>=1.3'
クイックリファレンス
必須のルートプロパティ
schema:user-defined-operator-v0.1.0name: 表示名id: 一意の識別子descriptionオペレーターが行うことconfigJSONスキーマオブジェクトtype:uc-udf、uc-udtf、またはpython-run-functionversion: 作者定義のバージョン文字列
オプションのルートプロパティ
ports入力ポートと出力ポートの定義run_functionインラインPythonコード(python-run-functionのみ)environment: Python環境と依存関係(python-run-functionのみ)
設定プロパティのデータ型
string | boolean | number | integer | array | object
UIウィジェット
input | textarea | checkbox | toggle | number | slider | select | multi-select | expression
オプションソース
static (固定値) | inputColumns (入力ポートから)
値の書式設定
expression | table_source | file_source | column_expressions | sort_expressions | aggregation_expressions | ai_function_expressions | is_preview | string[]