Recurso declarativo
Beta
Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.
As APIs engenharia de recurso declarativo permitem definir e compute recursos da fonte de dados. Um recurso pode ser definido usando uma variedade de fontes (tabela Delta e dados em tempo real da requisição) e cálculos (agregações em janelas de tempo, seleções simples de colunas e muito mais). Este guia cobre o seguinte fluxo de trabalho:
-
fluxo de trabalho de desenvolvimento de recursos
- Utilize
create_featurepara definir objetos de recurso Unity Catalog que podem ser utilizados no modelo de treinamento e atendimento ao fluxo de trabalho. - Alternativamente, construa objetos
Featurelocalmente e useregister_featurepara persistir neles no Unity Catalog posteriormente. O recurso construído localmente pode ser usado comcreate_training_setantes do registro.
- Utilize
-
Modelo de treinamento fluxo de trabalho
- Use
create_training_setpara calcular o recurso agregado pontual para machine learning. Para documentação detalhada sobre treinamento com recurso declarativo, veja modelos de ensino com recurso declarativo.
- Use
-
materialização de recursos e atendimento ao fluxo de trabalho
- Após definir um recurso com
create_featureou recuperá-lo usandoget_feature, você pode usarmaterialize_featurespara materializar o recurso ou conjunto de recursos em um armazenamento offline para reutilização eficiente ou em um armazenamento online para serviço online. - Use
create_training_setcom a view materializada para preparar um dataset de treinamento de lotes offline.
- Após definir um recurso com
Para obter detalhes API , consulte a referência API de recursos declarativos.
Requisitos
-
Um cluster computeclássico executando Databricks Runtime 17.0 ML ou superior.
-
Você precisa instalar o pacote Python personalizado. Execute as seguintes linhas de código sempre que executar um Notebook:
Python%pip install databricks-feature-engineering>=0.15.0
dbutils.library.restartPython()
Exemplo de início rápido
Para um notebook de início rápido executável, consulte o Notebook de Exemplo.
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
MaterializedFeaturePipelineScheduleState,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)
# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)
sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)
fe = FeatureEngineeringClient()
# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()
# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()
# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)
# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)
# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)
Exemplo de caderno
Notebook de início rápido de recursos declarativos
Modelo de treinamento e inferência
Para ensinar modelos e inferência de lotes de execução com recurso declarativo, incluindo log_model(), score_batch() e create_training_set(), veja ensinar modelos com recurso declarativo.
materialização de recursos
Após definir o recurso, você pode materializá-lo em armazenamentos offline ou online para reutilização eficiente no treinamento e na oferta de fluxo de trabalho. Após materializar o recurso, você pode servir modelos usando o modelo de serviço de CPU. Para mais detalhes, consulte Materialize declarative recurso.
Melhores práticas
nomeação de recursos
- Use nomes descritivos para recursos essenciais para o negócio.
- Siga convenções de nomenclatura consistentes em todas as equipes.
- Use nomes gerados automaticamente ao começar a desenvolver recursos.
Janelas de tempo
- Alinhe os limites das janelas com os ciclos de negócios (diário, semanal).
- Janelas temporais mais curtas capturam tendências recentes, mas podem ser ruidosas. Janelas temporais mais longas produzem distribuições de recursos mais estáveis, mas podem não captar mudanças comportamentais recentes. Escolha com base na rapidez com que o sinal subjacente muda para o seu caso de uso. Por exemplo, uma janela de 7 dias suaviza as flutuações diárias e produz entradas de modelo consistentes, enquanto uma janela de 1 hora reage rapidamente às mudanças comportamentais, mas pode introduzir variância que degrada o desempenho do modelo. Se a precisão do seu modelo se deteriorar quando a distribuição mudar, use uma janela maior para estabilizar as entradas.
- Janelas basculantes e de correr são mais escaláveis do que janelas de enrolar (contínuas). Para a maioria dos casos, comece com janelas deslizantes.
desempenho
- Materialize recurso da mesma fonte de dados em uma única chamada
materialize_featurespara minimizar varreduras de dados. - Utilize a mesma granularidade (por exemplo, todos os slides com duração de 1 hora ou 1 dia) para recursos na mesma fonte de dados para permitir um melhor agrupamento durante a materialização.
Colunas de entidade versus condições de filtro
Utilize este guia de decisão ao trabalhar com recursos da mesma tabela de origem:
Use entity (em create_feature) quando precisar de diferentes níveis de agregação:
- Recurso ao nível do cliente (uma linha por cliente):
entity=["customer_id"] - Recurso cliente-comerciante (várias linhas por cliente):
entity=["customer_id", "merchant_id"] - Diferentes níveis de agregação podem compartilhar o mesmo
DeltaTableSource— especifique valoresentitydiferentes em cada definição de recurso.
Use filter_condition (em DeltaTableSource) quando precisar filtrar linhas no mesmo nível de agregação:
- Somente transações de alto valor :
filter_condition="amount > 100"(ainda agregado por cliente) - Apenas pedidos concluídos :
filter_condition="status = 'completed'"(ainda agregado por cliente)
Regra prática: Se a sua alteração resultar em um número diferente de linhas por valor de entidade, use valores entity diferentes nas suas definições de recurso. Se você estiver apenas filtrando quais linhas contribuem para a mesma agregação, use filter_condition na fonte.
Padrões comuns
Análise de clientes
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]
Análise de tendências
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)
Padrões sazonais
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)
Limitações
- Os nomes das colunas de entidade e séries temporais devem corresponder entre o dataset de treinamento (rótulo) e as definições de recurso quando usados na API
create_training_set. - O nome da coluna usada como coluna
labelno dataset de treinamento não deve existir nas tabelas de origem usadas para definirFeatures. - Uma lista limitada de funções (UDAFs) é suportada na API
create_feature. Consulte Funções suportadas. - As colunas de entidade não podem ser do tipo
DATEouTIMESTAMP. RequestSourcesuporta apenas tipos de dados escalares definidos emScalarDataType(INTEGER,FLOAT,BOOLEAN,STRING,DOUBLE,LONG,TIMESTAMP,DATE,SHORT). Tipos complexos como arrays, mapas e structs não são suportados.RequestSourceNão suporta funções de agregação ou janelas de tempo. Somente funçõesColumnSelectionpodem ser usadas.- O conjunto de nomes de colunas de entidade, nomes de colunas de séries temporais e nomes de colunas de recursos de solicitação deve ser globalmente exclusivo em todas as fontes em um conjunto de treinamento ou endpoint de serviço.
Para limitações específicas de materialização, consulte Limitações.