Pular para o conteúdo principal

Recurso declarativo

info

Beta

Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.

As APIs engenharia de recurso declarativo permitem definir e compute recursos da fonte de dados. Um recurso pode ser definido usando uma variedade de fontes (tabela Delta e dados em tempo real da requisição) e cálculos (agregações em janelas de tempo, seleções simples de colunas e muito mais). Este guia cobre o seguinte fluxo de trabalho:

  • fluxo de trabalho de desenvolvimento de recursos

    • Utilize create_feature para definir objetos de recurso Unity Catalog que podem ser utilizados no modelo de treinamento e atendimento ao fluxo de trabalho.
    • Alternativamente, construa objetos Feature localmente e use register_feature para persistir neles no Unity Catalog posteriormente. O recurso construído localmente pode ser usado com create_training_set antes do registro.
  • Modelo de treinamento fluxo de trabalho

    • Use create_training_set para calcular o recurso agregado pontual para machine learning. Para documentação detalhada sobre treinamento com recurso declarativo, veja modelos de ensino com recurso declarativo.
  • materialização de recursos e atendimento ao fluxo de trabalho

    • Após definir um recurso com create_feature ou recuperá-lo usando get_feature, você pode usar materialize_features para materializar o recurso ou conjunto de recursos em um armazenamento offline para reutilização eficiente ou em um armazenamento online para serviço online.
    • Use create_training_set com a view materializada para preparar um dataset de treinamento de lotes offline.

Para obter detalhes API , consulte a referência API de recursos declarativos.

Requisitos

  • Um cluster computeclássico executando Databricks Runtime 17.0 ML ou superior.

  • Você precisa instalar o pacote Python personalizado. Execute as seguintes linhas de código sempre que executar um Notebook:

    Python
    %pip install databricks-feature-engineering>=0.15.0
    dbutils.library.restartPython()

Exemplo de início rápido

Para um notebook de início rápido executável, consulte o Notebook de Exemplo.

Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
MaterializedFeaturePipelineScheduleState,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta

CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"

# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)

# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)

sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)

fe = FeatureEngineeringClient()

# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()

# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()

# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)

# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)

# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()

# training code

fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)

# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)

# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)

# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)

Exemplo de caderno

Notebook de início rápido de recursos declarativos

Abrir notebook em uma nova aba

Modelo de treinamento e inferência

Para ensinar modelos e inferência de lotes de execução com recurso declarativo, incluindo log_model(), score_batch() e create_training_set(), veja ensinar modelos com recurso declarativo.

materialização de recursos

Após definir o recurso, você pode materializá-lo em armazenamentos offline ou online para reutilização eficiente no treinamento e na oferta de fluxo de trabalho. Após materializar o recurso, você pode servir modelos usando o modelo de serviço de CPU. Para mais detalhes, consulte Materialize declarative recurso.

Melhores práticas

nomeação de recursos

  • Use nomes descritivos para recursos essenciais para o negócio.
  • Siga convenções de nomenclatura consistentes em todas as equipes.
  • Use nomes gerados automaticamente ao começar a desenvolver recursos.

Janelas de tempo

  • Alinhe os limites das janelas com os ciclos de negócios (diário, semanal).
  • Janelas temporais mais curtas capturam tendências recentes, mas podem ser ruidosas. Janelas temporais mais longas produzem distribuições de recursos mais estáveis, mas podem não captar mudanças comportamentais recentes. Escolha com base na rapidez com que o sinal subjacente muda para o seu caso de uso. Por exemplo, uma janela de 7 dias suaviza as flutuações diárias e produz entradas de modelo consistentes, enquanto uma janela de 1 hora reage rapidamente às mudanças comportamentais, mas pode introduzir variância que degrada o desempenho do modelo. Se a precisão do seu modelo se deteriorar quando a distribuição mudar, use uma janela maior para estabilizar as entradas.
  • Janelas basculantes e de correr são mais escaláveis do que janelas de enrolar (contínuas). Para a maioria dos casos, comece com janelas deslizantes.

desempenho

  • Materialize recurso da mesma fonte de dados em uma única chamada materialize_features para minimizar varreduras de dados.
  • Utilize a mesma granularidade (por exemplo, todos os slides com duração de 1 hora ou 1 dia) para recursos na mesma fonte de dados para permitir um melhor agrupamento durante a materialização.

Colunas de entidade versus condições de filtro

Utilize este guia de decisão ao trabalhar com recursos da mesma tabela de origem:

Use entity (em create_feature) quando precisar de diferentes níveis de agregação:

  • Recurso ao nível do cliente (uma linha por cliente): entity=["customer_id"]
  • Recurso cliente-comerciante (várias linhas por cliente): entity=["customer_id", "merchant_id"]
  • Diferentes níveis de agregação podem compartilhar o mesmo DeltaTableSource — especifique valores entity diferentes em cada definição de recurso.

Use filter_condition (em DeltaTableSource) quando precisar filtrar linhas no mesmo nível de agregação:

  • Somente transações de alto valor : filter_condition="amount > 100" (ainda agregado por cliente)
  • Apenas pedidos concluídos : filter_condition="status = 'completed'" (ainda agregado por cliente)

Regra prática: Se a sua alteração resultar em um número diferente de linhas por valor de entidade, use valores entity diferentes nas suas definições de recurso. Se você estiver apenas filtrando quais linhas contribuem para a mesma agregação, use filter_condition na fonte.

Padrões comuns

Análise de clientes

Python
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow

fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),

# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),

# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]

Análise de tendências

Python
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)

historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)

Padrões sazonais

Python
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)

Limitações

  • Os nomes das colunas de entidade e séries temporais devem corresponder entre o dataset de treinamento (rótulo) e as definições de recurso quando usados na API create_training_set .
  • O nome da coluna usada como coluna label no dataset de treinamento não deve existir nas tabelas de origem usadas para definir Features.
  • Uma lista limitada de funções (UDAFs) é suportada na API create_feature . Consulte Funções suportadas.
  • As colunas de entidade não podem ser do tipo DATE ou TIMESTAMP.
  • RequestSource suporta apenas tipos de dados escalares definidos em ScalarDataType (INTEGER, FLOAT, BOOLEAN, STRING, DOUBLE, LONG, TIMESTAMP, DATE, SHORT). Tipos complexos como arrays, mapas e structs não são suportados.
  • RequestSource Não suporta funções de agregação ou janelas de tempo. Somente funções ColumnSelection podem ser usadas.
  • O conjunto de nomes de colunas de entidade, nomes de colunas de séries temporais e nomes de colunas de recursos de solicitação deve ser globalmente exclusivo em todas as fontes em um conjunto de treinamento ou endpoint de serviço.

Para limitações específicas de materialização, consulte Limitações.