Operadores definidos pelo usuário no LakeFlow Designer
Visualização
Este recurso está em Pré-visualização Pública.
LakeFlow Designer permite criar operadores definidos pelo usuário que aparecem diretamente na tela, juntamente com os operadores integrados. Use-os para estender LakeFlow Designer com sua própria lógica de negócios, cálculos ou integrações.
Existem três tipos de operadores definidos pelo usuário:
python-run-function: Um arquivo YAML independente com Python embutido armazenado no workspace. Ideal para transformações em nível de DataFrame e integrações externas. As permissões são gerenciadas no nível do arquivo workspace .uc-udf: Envolve uma função escalar Unity Catalog . Ideal para transformações em nível de coluna. O acesso é regido pelas permissões Unity Catalog .uc-udtf: Envolve uma função com valor de tabela Unity Catalog . Ideal para transformações em nível de tabela, como clustering e agregação de ML . O acesso é regido pelas permissões Unity Catalog .
Recurso |
|
|
|
|---|---|---|---|
Exemplo de caso de uso | Transformações DataFrame , integrações API , notificações email | Cálculos em nível de coluna (IMC, taxas de juros) | clustering ML , agregação entre linhas |
Entrada | DataFrames | Valores únicos | Tabela inteira, linha por linha |
Saída | DataFrames | Valor único | Tabela (várias linhas) |
Requer a função Unity Catalog. | Não | Sim | Sim |
Governança de acesso | permissões de arquivo do espaço de trabalho | Permissões Unity Catalog ( | Permissões Unity Catalog ( |
Idiomas suportados | Somente Python | SQL ou Python em um wrapper SQL | SQL ou Python em um wrapper SQL |
Como funcionam os operadores definidos pelo usuário?
Um operador definido pelo usuário consiste em:
- Lógica do operador : O código que é executado quando o operador entra em ação. Isso pode ser uma função Python inline
run()(parapython-run-function) ou uma função Unity Catalog (parauc-udfeuc-udtf). - Configuração YAML : Indica ao Lakeflow Designer como apresentar o operador na interface do usuário, incluindo o nome do operador, descrição, parâmetros de entrada, widgets da interface e portas. Todos os tipos de operadores usam o esquema
user-defined-operator-v0.1.0. - Arquivo de registro : Uma entrada em
.user_defined_operators.yamlque permite ao Lakeflow Designer descobrir o operador.
Lógica do operador
Lógica do operador definida pelo usuário da função de execuçãoPython
Todo operador python-run-function deve definir uma função run() :
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
config: Valores configurados pelo usuário na interface do usuário, indexados pelo nome da propriedade.inputs: DataFrames de entrada, indexados pela porta de entradaname.spark: A SparkSession ativa.- Retorna : Um dicionário que mapeia os valores da porta de saída
namepara DataFrames.
O exemplo a seguir filtra linhas de um DataFrame de entrada:
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Se o seu operador exigir um pacote pip externo, adicione o campo environment ao YAML:
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
Lógica do operador UDF e UDTF
Você pode escrever funções UC em SQL ou Python. As funções Python são encapsuladas em uma instrução SQL CREATE FUNCTION :
Função SQL:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
Função Python (encapsulada em SQL):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
As UDFs processam um único valor por vez e retornam um valor calculado. As UDTFs processam tabelas linha por linha e podem manter o estado em todas as linhas. Use uc-udf para transformações em nível de coluna e uc-udtf para operações como clustering ML ou agregação.
Além disso, as UDTFs exigem que você defina três métodos key : __init__(), eval() e terminate():
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
As tabelas de retorno UDTF devem ter tipos fixos e explícitos. Não é possível referenciar tipos de coluna de entrada na configuração de retorno.
Configuração YAML
A configuração YAML informa ao Lakeflow Designer como apresentar o operador na interface do usuário. Define o nome do operador, a descrição, os parâmetros de entrada, os widgets da interface do usuário e as portas. Cada campo de configuração é uma propriedade com um tipo, um título e dicas opcionais de widget x-ui :
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
Para obter detalhes completos sobre o esquema YAML, incluindo todos os tipos de widgets e opções de configuração, consulte a Referência YAML de operadores definidos pelo usuário.
Portos
As portas definem as entradas e saídas para sua operadora:
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
YAML para operadores de função de execução em Python
Para operadores python-run-function , o arquivo YAML é independente e inclui um campo run_function com código Python embutido:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
YAML para funções Unity Catalog
Para operadores baseados em UC, incorpore a configuração YAML como um comentário ou docstring em sua função.
Em SQL (use o comentário /* ... */ ):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
Em Python (use a docstring """ ... """ ):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
cadastre-se e implante sua operadora no Lakeflow Designer
Para que seu operador apareça no Lakeflow Designer, registre-o em um arquivo .user_defined_operators.yaml :
- Nível do espaço de trabalho: Coloque o arquivo na raiz do seu workspace para que o operador fique visível para todos os usuários.
- Nível de usuário: Coloque o arquivo em seu diretório de pastas de usuário (
/Workspace/Users/<user-name>/.user_defined_operators.yaml) para tornar os operadores visíveis apenas para você.
A seção operators: suporta caminhos de arquivos, referências de funções Unity Catalog e padrões glob. Você pode combinar tipos de entrada:
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
Configurações avançadas
Modo de pré-visualização
O Lakeflow Designer suporta pré-visualizações durante o modo de design. Para operadores que chamam APIs externas ou escrevem em sistemas externos, adicione uma propriedade de configuração is_preview para que você possa ignorar efeitos colaterais durante a pré-visualização. Quando o modo de pré-visualização está ativado, os usuários precisam clicar explicitamente em "Executar" para executar o operador com efeitos colaterais.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
O Lakeflow Designer define automaticamente este valor como true durante a pré-visualização. Verifique em sua lógica para evitar efeitos colaterais:
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
ConexõesUnity Catalog
Para operadores SQL baseados em UC que chamam APIs externas, use conexões HTTP do Unity Catalog para armazenar credenciais com segurança:
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
Em seguida, use a conexão em sua UDF SQL com a função http_request() . Para obter detalhes, consulte Conectar-se a um serviço HTTP externo.
Cliente do Espaço de Trabalho
Para operadores python-run-function , você pode usar o Databricks WorkspaceClient para acessar recursos workspace e APIs externas:
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
Crie um operador definido pelo usuário para uma função de execução completa Python
Os passos a seguir mostram como criar um operador python-run-function do zero.
o passo 1: Defina a lógica
Escreva sua função run() em um Notebook:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
o passo 2: Teste a função
Teste a função interativamente com dados de exemplo:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
o passo 3: Criar a configuração YAML
Defina os metadados do operador, os campos de configuração e as portas em um arquivo YAML:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
o passo 4: Combine a lógica e o YAML
Adicione os campos run_function e ports para criar o arquivo YAML completo. Salve-o em seu workspace, por exemplo, /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
o passo 5: registrar a operadora
Adicione o caminho do arquivo ao seu arquivo .user_defined_operators.yaml :
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
Passo 6: Use o operador no Lakeflow Designer
Abra o Lakeflow Designer e verifique se o operador aparece na paleta de operadores. Arraste-o para a tela, conecte uma entrada, configure o nome da coluna e execute uma pré-visualização.
Criar um operador UC completo definido pelo usuário
Os seguintes passos mostram como criar um operador uc-udf baseado em UC.
o passo 1: Defina a lógica
Escreva e teste a lógica da sua função em um Notebook:
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
o passo 2: Criar a configuração YAML
Defina os metadados do operador, os campos de configuração e as portas:
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
o passo 3: Combine a lógica e o YAML
Crie a função Unity Catalog com o YAML incorporado como uma docstring:
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
o passo 4: Teste a função
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
o passo 5: registrar a operadora
Adicione a referência da função Unity Catalog ao seu arquivo .user_defined_operators.yaml :
operators:
- catalog: main
schema: my_schema
functionName: double_value
Passo 6: Use o operador no Lakeflow Designer
Abra o Lakeflow Designer e verifique se o operador aparece na paleta de operadores. Arraste-o para a tela, conecte uma entrada e execute uma pré-visualização.
Solução de problemas
Problema | soluções |
|---|---|
O operador não aparece no LakeFlow Designer. | Verifique se |
A validação do esquema falhou. | Verifique seu YAML em relação ao esquema oficial em |
Permissão negada. | Para operadores baseados em UC, verifique se os usuários têm |
| Verifique se a assinatura da função |
UDTF retorna tipos incorretos. | Os tipos de retorno da UDTF devem ser explícitos — você não pode referenciar tipos de coluna de entrada. |
Permissões
Permissão | Propósito |
|---|---|
Acesso de leitura a | Descubra a operadora. |
Acesso de leitura ao arquivo YAML (somente | Carregar a definição do operador. |
EXECUTE na função Unity Catalog (somente operadores baseados em UC). | execução o operador. |
USE SCHEMA no esquema (somente operadores baseados em UC). | Acesse o esquema onde a função foi criada. |
Outras permissões | Dependendo da sua operadora, os usuários podem precisar de outras permissões. Por exemplo, |
Próximos passos
Explore o seguinte tutorial:
Exemplo | Tipo | Descrição |
|---|---|---|
| Enviar dados DataFrame como anexo CSV email via Gmail. | |
| Calcule os valores futuros do investimento usando a fórmula de juros compostos. | |
| Segmentar dados em clusters usando scikit-learn. | |
| Enviar notificações para o canal do Slack via API. | |
| Operador de referência que exibe todos os widgets de interface do usuário disponíveis. |
Para uma referência completa ao esquema YAML, consulte a Referência YAML de operadores definidos pelo usuário.