Pular para o conteúdo principal

Operadores definidos pelo usuário no LakeFlow Designer

info

Visualização

Este recurso está em Pré-visualização Pública.

LakeFlow Designer permite criar operadores definidos pelo usuário que aparecem diretamente na tela, juntamente com os operadores integrados. Use-os para estender LakeFlow Designer com sua própria lógica de negócios, cálculos ou integrações.

Existem três tipos de operadores definidos pelo usuário:

  • python-run-function : Um arquivo YAML independente com Python embutido armazenado no workspace. Ideal para transformações em nível de DataFrame e integrações externas. As permissões são gerenciadas no nível do arquivo workspace .
  • uc-udf : Envolve uma função escalar Unity Catalog . Ideal para transformações em nível de coluna. O acesso é regido pelas permissões Unity Catalog .
  • uc-udtf : Envolve uma função com valor de tabela Unity Catalog . Ideal para transformações em nível de tabela, como clustering e agregação de ML . O acesso é regido pelas permissões Unity Catalog .

Recurso

python-run-function

uc-udf

uc-udtf

Exemplo de caso de uso

Transformações DataFrame , integrações API , notificações email

Cálculos em nível de coluna (IMC, taxas de juros)

clustering ML , agregação entre linhas

Entrada

DataFrames

Valores únicos

Tabela inteira, linha por linha

Saída

DataFrames

Valor único

Tabela (várias linhas)

Requer a função Unity Catalog.

Não

Sim

Sim

Governança de acesso

permissões de arquivo do espaço de trabalho

Permissões Unity Catalog (EXECUTE, USE SCHEMA)

Permissões Unity Catalog (EXECUTE, USE SCHEMA)

Idiomas suportados

Somente Python

SQL ou Python em um wrapper SQL

SQL ou Python em um wrapper SQL

Como funcionam os operadores definidos pelo usuário?

Um operador definido pelo usuário consiste em:

  • Lógica do operador : O código que é executado quando o operador entra em ação. Isso pode ser uma função Python inline run() (para python-run-function) ou uma função Unity Catalog (para uc-udf e uc-udtf).
  • Configuração YAML : Indica ao Lakeflow Designer como apresentar o operador na interface do usuário, incluindo o nome do operador, descrição, parâmetros de entrada, widgets da interface e portas. Todos os tipos de operadores usam o esquema user-defined-operator-v0.1.0 .
  • Arquivo de registro : Uma entrada em .user_defined_operators.yaml que permite ao Lakeflow Designer descobrir o operador.

Lógica do operador

Lógica do operador definida pelo usuário da função de execuçãoPython

Todo operador python-run-function deve definir uma função run() :

Python
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config : Valores configurados pelo usuário na interface do usuário, indexados pelo nome da propriedade.
  • inputs : DataFrames de entrada, indexados pela porta de entrada name.
  • spark : A SparkSession ativa.
  • Retorna : Um dicionário que mapeia os valores da porta de saída name para DataFrames.

O exemplo a seguir filtra linhas de um DataFrame de entrada:

Python
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}

Se o seu operador exigir um pacote pip externo, adicione o campo environment ao YAML:

YAML
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0

Lógica do operador UDF e UDTF

Você pode escrever funções UC em SQL ou Python. As funções Python são encapsuladas em uma instrução SQL CREATE FUNCTION :

Função SQL:

SQL
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);

Função Python (encapsulada em SQL):

SQL
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;

As UDFs processam um único valor por vez e retornam um valor calculado. As UDTFs processam tabelas linha por linha e podem manter o estado em todas as linhas. Use uc-udf para transformações em nível de coluna e uc-udtf para operações como clustering ML ou agregação.

Além disso, as UDTFs exigem que você defina três métodos key : __init__(), eval() e terminate():

Python
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.

def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.

def terminate(self):
# Called after all rows - perform final calculations and yield results.
nota

As tabelas de retorno UDTF devem ter tipos fixos e explícitos. Não é possível referenciar tipos de coluna de entrada na configuração de retorno.

Configuração YAML

A configuração YAML informa ao Lakeflow Designer como apresentar o operador na interface do usuário. Define o nome do operador, a descrição, os parâmetros de entrada, os widgets da interface do usuário e as portas. Cada campo de configuração é uma propriedade com um tipo, um título e dicas opcionais de widget x-ui :

YAML
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression

Para obter detalhes completos sobre o esquema YAML, incluindo todos os tipos de widgets e opções de configuração, consulte a Referência YAML de operadores definidos pelo usuário.

Portos

As portas definem as entradas e saídas para sua operadora:

YAML
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data

YAML para operadores de função de execução em Python

Para operadores python-run-function , o arquivo YAML é independente e inclui um campo run_function com código Python embutido:

YAML
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}

YAML para funções Unity Catalog

Para operadores baseados em UC, incorpore a configuração YAML como um comentário ou docstring em sua função.

Em SQL (use o comentário /* ... */ ):

SQL
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);

Em Python (use a docstring """ ... """ ):

SQL
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""

return weight_kg / (height_m ** 2)
$$;

cadastre-se e implante sua operadora no Lakeflow Designer

Para que seu operador apareça no Lakeflow Designer, registre-o em um arquivo .user_defined_operators.yaml :

  • Nível do espaço de trabalho: Coloque o arquivo na raiz do seu workspace para que o operador fique visível para todos os usuários.
  • Nível de usuário: Coloque o arquivo em seu diretório de pastas de usuário (/Workspace/Users/<user-name>/.user_defined_operators.yaml) para tornar os operadores visíveis apenas para você.

A seção operators: suporta caminhos de arquivos, referências de funções Unity Catalog e padrões glob. Você pode combinar tipos de entrada:

YAML
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function

Configurações avançadas

Modo de pré-visualização

O Lakeflow Designer suporta pré-visualizações durante o modo de design. Para operadores que chamam APIs externas ou escrevem em sistemas externos, adicione uma propriedade de configuração is_preview para que você possa ignorar efeitos colaterais durante a pré-visualização. Quando o modo de pré-visualização está ativado, os usuários precisam clicar explicitamente em "Executar" para executar o operador com efeitos colaterais.

YAML
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false

O Lakeflow Designer define automaticamente este valor como true durante a pré-visualização. Verifique em sua lógica para evitar efeitos colaterais:

Python
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

ConexõesUnity Catalog

Para operadores SQL baseados em UC que chamam APIs externas, use conexões HTTP do Unity Catalog para armazenar credenciais com segurança:

SQL
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);

Em seguida, use a conexão em sua UDF SQL com a função http_request() . Para obter detalhes, consulte Conectar-se a um serviço HTTP externo.

Cliente do Espaço de Trabalho

Para operadores python-run-function , você pode usar o Databricks WorkspaceClient para acessar recursos workspace e APIs externas:

Python
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources

Crie um operador definido pelo usuário para uma função de execução completa Python

Os passos a seguir mostram como criar um operador python-run-function do zero.

o passo 1: Defina a lógica

Escreva sua função run() em um Notebook:

Python
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}

o passo 2: Teste a função

Teste a função interativamente com dados de exemplo:

Python
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)

result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)

result["out"].show()

o passo 3: Criar a configuração YAML

Defina os metadados do operador, os campos de configuração e as portas em um arquivo YAML:

YAML
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name

o passo 4: Combine a lógica e o YAML

Adicione os campos run_function e ports para criar o arquivo YAML completo. Salve-o em seu workspace, por exemplo, /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

YAML
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}

o passo 5: registrar a operadora

Adicione o caminho do arquivo ao seu arquivo .user_defined_operators.yaml :

YAML
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Passo 6: Use o operador no Lakeflow Designer

Abra o Lakeflow Designer e verifique se o operador aparece na paleta de operadores. Arraste-o para a tela, conecte uma entrada, configure o nome da coluna e execute uma pré-visualização.

Criar um operador UC completo definido pelo usuário

Os seguintes passos mostram como criar um operador uc-udf baseado em UC.

o passo 1: Defina a lógica

Escreva e teste a lógica da sua função em um Notebook:

Python
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2

o passo 2: Criar a configuração YAML

Defina os metadados do operador, os campos de configuração e as portas:

YAML
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output

o passo 3: Combine a lógica e o YAML

Crie a função Unity Catalog com o YAML incorporado como uma docstring:

SQL
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""

def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2

return double_value(input_value)
$$

o passo 4: Teste a função

SQL
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

o passo 5: registrar a operadora

Adicione a referência da função Unity Catalog ao seu arquivo .user_defined_operators.yaml :

YAML
operators:
- catalog: main
schema: my_schema
functionName: double_value

Passo 6: Use o operador no Lakeflow Designer

Abra o Lakeflow Designer e verifique se o operador aparece na paleta de operadores. Arraste-o para a tela, conecte uma entrada e execute uma pré-visualização.

Solução de problemas

Problema

soluções

O operador não aparece no LakeFlow Designer.

Verifique se .user_defined_operators.yaml existe e lista sua função ou caminho de arquivo. Para operadores python-run-function , verifique o caminho do arquivo e se o arquivo YAML está acessível.

A validação do esquema falhou.

Verifique seu YAML em relação ao esquema oficial em https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.

Permissão negada.

Para operadores baseados em UC, verifique se os usuários têm EXECUTE na função e USE SCHEMA no esquema. Para operadores python-run-function , verifique se os usuários têm acesso de leitura ao arquivo YAML.

python-run-function O operador falha em tempo de execução.

Verifique se a assinatura da função run() corresponde def run(config, inputs, spark). Verifique se os nomes das portas no código correspondem ao YAML e se a chave do dicionário de retorno corresponde aos valores da porta de saída name .

UDTF retorna tipos incorretos.

Os tipos de retorno da UDTF devem ser explícitos — você não pode referenciar tipos de coluna de entrada.

Permissões

Permissão

Propósito

Acesso de leitura a .user_defined_operators.yaml.

Descubra a operadora.

Acesso de leitura ao arquivo YAML (somente python-run-function ).

Carregar a definição do operador.

EXECUTE na função Unity Catalog (somente operadores baseados em UC).

execução o operador.

USE SCHEMA no esquema (somente operadores baseados em UC).

Acesse o esquema onde a função foi criada.

Outras permissões

Dependendo da sua operadora, os usuários podem precisar de outras permissões. Por exemplo, USE CONNECTION em uma conexão Unity Catalog para chamadas API HTTP.

Próximos passos

Explore o seguinte tutorial:

Exemplo

Tipo

Descrição

Remetente email do Gmail

python-run-function

Enviar dados DataFrame como anexo CSV email via Gmail.

calculadora de juros compostos

uc-udf

Calcule os valores futuros do investimento usando a fórmula de juros compostos.

clusteringK-means

uc-udtf

Segmentar dados em clusters usando scikit-learn.

Enviar mensagem no Slack

uc-udf

Enviar notificações para o canal do Slack via API.

Todos os widgets da interface do usuário

uc-udf

Operador de referência que exibe todos os widgets de interface do usuário disponíveis.

Para uma referência completa ao esquema YAML, consulte a Referência YAML de operadores definidos pelo usuário.