Recurso declarativo engenharia e gerenciamento de pipeline
Beta
Este recurso é Beta e está disponível nas seguintes regiões: us-east-1 e us-west-2.
As APIs declarativas do repositório de recursos permitem definir e compute recursos de agregação em janela de tempo a partir da fonte de dados. Este guia cobre o seguinte fluxo de trabalho:
-
fluxo de trabalho de desenvolvimento de recursos
- Utilize
create_featurepara definir objetos de recurso Unity Catalog que podem ser utilizados no modelo de treinamento e atendimento ao fluxo de trabalho.
- Utilize
-
Modelo de treinamento fluxo de trabalho
- Use
create_training_setpara calcular o recurso agregado pontual para machine learning. Isso retornará um objeto de conjunto de treinamento que pode retornar um DataFrame Spark com recursos de computação aumentados para o dataset de observação para treinamento de um modelo. - Chamando
log_modelcom este treinamento definido, para salvar este modelo no Unity Catalog, juntamente com a linhagem entre os objetos de recurso e modelo. score_batchUtiliza a linhagem Unity Catalog para usar o código de definição de recursos a fim de realizar agregações de recursos pontuais e corretas, que são adicionadas ao dataset de inferência para pontuação do modelo.
- Use
-
materialização de recursos e atendimento ao fluxo de trabalho
- Após definir um recurso com
create_featureou recuperá-lo usandoget_feature, você pode usarmaterialize_featurespara materializar o recurso ou conjunto de recursos em um armazenamento offline para reutilização eficiente ou em um armazenamento online para serviço online. - Use
create_training_setcom a view materializada para preparar um dataset de treinamento de lotes offline.
- Após definir um recurso com
Para obter documentação detalhada sobre log_model e score_batch, consulte Usar recurso para ensinar modelos.
Requisitos
-
Um cluster computeclássico executando Databricks Runtime 17.0 ML ou superior.
-
Você precisa instalar o pacote Python personalizado. As seguintes linhas de código devem ser executadas sempre que um Notebook for executado:
Python%pip install databricks-feature-engineering>=0.14.0
dbutils.library.restartPython()
Exemplo de início rápido
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import DeltaTableSource, Sum, Avg, ContinuousWindow, TumblingWindow, SlidingWindow, OfflineStoreConfig
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
entity_columns=["user_id"],
timeseries_column="transaction_time"
)
# 2. Define features
fe = FeatureEngineeringClient()
features = [
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="avg_transaction_30d",
source=source,
inputs=["amount"],
function=Avg(),
time_window=TumblingWindow(window_duration=timedelta(days=30))
),
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
source=source,
inputs=["amount"],
function=Sum(),
time_window=SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))
# name auto-generated: "amount_sum_continuous_7d"
),
]
# 3. Create training set using declarative features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
# It can have other context features specific to the individual observations.
training_set = fe.create_training_set(
df=labeled_df,
features=features,
label="target",
)
training_set.load_df().display() # action: joins labeled_df with computed feature
# 4. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 5. (Optional) Materialize features for serving
fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features"
),
pipeline_state="ACTIVE",
cron_schedule="0 0 * * * ?" # Hourly
)
Após materializar o recurso, você pode servir modelos usando o modelo de serviço de CPU. Para obter detalhes sobre a disponibilização online, consulte Materializar e servir recurso declarativo.
fonte de dados
DeltaTableSource
Tipos de dados permitidos para timeseries_column: TimestampType, DateType. Outros tipos de dados inteiros podem funcionar, mas causarão perda de precisão para agregações de janelas de tempo.
O código a seguir mostra um exemplo usando a tabela main.analytics.user_events do Unity Catalog:
from databricks.feature_engineering.entities import DeltaTableSource
source = DeltaTableSource(
catalog_name="main", # Catalog name
schema_name="analytics", # Schema name
table_name="user_events", # Table name
entity_columns=["user_id"], # Join keys, used to look up features for an entity
timeseries_column="event_time" # Timestamp for time windows
)
APIdeclarativa de recursos
APIcreate_feature()
FeatureEngineeringClient.create_feature() Fornece validação abrangente e garante a construção adequada dos recursos:
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource
inputs: List[str], # Required: List of column names from the source
function: Union[Function, str], # Required: Aggregation function (Sum, Avg, Count, etc.)
time_window: TimeWindow, # Required: TimeWindow for aggregation
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
name: Optional[str], # Optional: Feature name (auto-generated if omitted)
description: Optional[str], # Optional: Feature description
filter_condition: Optional[str], # Optional: SQL WHERE clause to filter source data
) -> Feature
Parâmetros:
source: A fonte de dados usada na computação de recursosinputsLista de nomes de colunas da fonte a serem usadas como entrada para agregação.function: A função de agregação (instância da função ou nome da string). Veja abaixo a lista de funções suportadas.time_window: O intervalo de tempo para agregação (instância de TimeWindow ou dicionário com 'duração' e 'offset' opcional)catalog_name: O nome do catálogo para o recursoschema_name: O nome do esquema para o recursoname: Nome do recurso opcional (gerado automaticamente se omitido)descriptionDescrição opcional do recursofilter_conditionCláusula WHERE opcional em SQL para filtrar os dados de origem antes da agregação. Exemplo:"status = 'completed'","transaction" = "Credit" AND "amount > 100"
Retorna: Uma instância de recurso validada
Gera um erro ValueError se alguma validação falhar.
Nomes gerados automaticamente
Quando name é omitido, os nomes seguem o padrão: {column}_{function}_{window}. Por exemplo:
price_avg_continuous_1h(Preço médio por hora)transaction_count_continuous_30d_1d(Contagem de transações dos últimos 30 dias com deslocamento de 1 dia em relação ao carimbo de data/hora do evento)
Funções suportadas
Todas as funções são aplicadas em uma janela de tempo de agregação, conforme descrito na seção de janelas de tempo abaixo.
Função | Taquigrafia | Descrição | Exemplo de caso de uso |
|---|---|---|---|
|
| Total de valores | Uso diário do aplicativo por usuário em minutos |
|
| Média dos valores | Valor médio da transação |
|
| Número de registros | Número de logins por usuário |
|
| Valor mínimo | Frequência cardíaca mais baixa registrada por um dispositivo vestível |
|
| Valor máximo | Tamanho máximo do cesto de compras por sessão |
|
| desvio padrão da população | Variação diária do valor das transações entre todos os clientes |
|
| desvio padrão da amostra | Variabilidade das taxas de cliques em campanhas publicitárias |
|
| Variância populacional | Distribuição das leituras dos sensores de dispositivos IoT em uma fábrica |
|
| Variância da amostra | Distribuição das classificações de filmes em um grupo amostrado |
|
| Contagem aproximada de únicos | Contagem específica de itens comprados |
| N/A | Percentil aproximado | latência de resposta p95 |
|
| Primeiro valor | Registro de data e hora do primeiro login |
|
| Último valor | Valor da compra mais recente |
*As funções com parâmetros usam valores default quando se utiliza a forma abreviada de strings.
O exemplo a seguir mostra o recurso de agregação de janelas definido na mesma fonte de dados.
from databricks.feature_engineering.entities import Sum, Avg, Count, Max, ApproxCountDistinct
fe = FeatureEngineeringClient()
sum_feature = fe.create_feature(source=source, inputs=["amount"], function=Sum(), ...)
avg_feature = fe.create_feature(source=source, inputs=["amount"], function=Avg(), ...)
distinct_count = fe.create_feature(
source=source,
inputs=["product_id"],
function=ApproxCountDistinct(relativeSD=0.01),
...
)
recurso com condições de filtro
As APIs declarativas de recursos também suportam a aplicação de um filtro SQL , que é aplicado como uma cláusula WHERE em agregações. Os filtros são úteis ao trabalhar com grandes tabelas de origem que incluem um superconjunto de dados necessários para o cálculo de recursos, e minimizam a necessidade de criar visualizações separadas sobre essas tabelas.
from databricks.feature_engineering.entities import Sum, Count, ContinuousWindow
from datetime import timedelta
# Only aggregate high-value transactions
high_value_sales = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=transactions,
inputs=["amount"],
function=Sum(),
time_window=ContinuousWindow(window_duration=timedelta(days=30)),
filter_condition="amount > 100" # Only transactions over $100
)
# Multiple conditions using SQL syntax
completed_orders = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=orders,
inputs=["order_id"],
function=Count(),
time_window=ContinuousWindow(window_duration=timedelta(days=7)),
filter_condition="status = 'completed' AND payment_method = 'credit_card'"
)
Janelas de tempo
APIs declarativas de engenharia recursiva suportam três tipos diferentes de janelas para controlar o comportamento de retrocesso em agregações baseadas em janelas de tempo: contínua, rotativa e deslizante.
- As janelas contínuas permitem a visualização retroativa a partir do momento do evento. A duração e o deslocamento são definidos explicitamente.
- Janelas móveis são janelas de tempo fixas e não sobrepostas. Cada ponto de dados pertence a exatamente uma janela.
- As janelas deslizantes são janelas de tempo sobrepostas e rolantes com um intervalo de deslizamento configurável.
A ilustração a seguir mostra como eles funcionam.

Janela contínua
As janelas contínuas são agregações atualizadas e em tempo real, normalmente usadas em dados de transmissão. Em um pipeline de transmissão, a janela contínua emite uma nova linha somente quando o conteúdo da janela de comprimento fixo muda, como quando um evento entra ou sai. Quando um recurso de janela contínua é usado no pipeline de treinamento, um cálculo preciso do recurso em um ponto específico no tempo é realizado na fonte de dados, usando a duração da janela de comprimento fixo imediatamente anterior ao carimbo de data/hora de um evento específico. Isso ajuda a evitar distorções entre os modos online e offline ou vazamento de dados. recurso no horário T agrega eventos de [T − duração, T).
class ContinuousWindow(TimeWindow):
window_duration: datetime.timedelta
offset: Optional[datetime.timedelta] = None
A tabela a seguir lista os parâmetros para uma janela contínua. Os horários de início e término da janela são baseados nos seguintes parâmetros:
- horário de início:
evaluation_time - window_duration + offset(inclusive) - Hora de término:
evaluation_time + offset(exclusivo)
Parâmetro | Restrições |
|---|---|
| Deve ser ≤ 0 (movimento da janela para trás no tempo a partir do carimbo de data/hora final). Use |
| Deve ser > 0 |
from databricks.feature_engineering.entities import ContinuousWindow
from datetime import timedelta
# Look back 7 days from evaluation time
window = ContinuousWindow(window_duration=timedelta(days=7))
Defina uma janela contínua com deslocamento usando o código abaixo.
# Look back 7 days, but end 1 day ago (exclude most recent day)
window = ContinuousWindow(
window_duration=timedelta(days=7),
offset=timedelta(days=-1)
)
Exemplos de janela contínua
-
window_duration=timedelta(days=7), offset=timedelta(days=0)Isso cria uma janela de análise retrospectiva de 7 dias, terminando no momento da avaliação atual. Para um evento às 14h do Dia 7, isso inclui todos os eventos desde as 14h do Dia 0 até (mas não incluindo) as 14h do Dia 7. -
window_duration=timedelta(hours=1), offset=timedelta(minutes=-30)Isso cria uma janela de retrospectiva de 1 hora, terminando 30 minutos antes do horário de avaliação. Para um evento às 15h, isso inclui todos os eventos das 13h30 até (mas não incluindo) 14h30. Isso é útil para account atrasos na ingestão de dados.
Janela basculante
Para recursos definidos usando janelas deslizantes, as agregações são calculadas em uma janela de comprimento fixo predeterminado que avança por um intervalo de deslizamento, produzindo janelas não sobrepostas que particionam completamente o tempo. Consequentemente, cada evento na fonte contribui para exatamente uma janela. recurso no tempo t agrega dados de janelas que terminam em ou antes de t (exclusivo). Windows teve início na época do Unix.
class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta
A tabela a seguir lista os parâmetros para uma janela basculante.
Parâmetro | Restrições |
|---|---|
| Deve ser > 0 |
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta
window = TumblingWindow(
window_duration=timedelta(days=7)
)
Exemplo de janela basculante
window_duration=timedelta(days=5)Isso cria janelas de tempo predefinidas com duração fixa de 5 dias cada. Exemplo: A janela nº 1 abrange do dia 0 ao dia 4, a janela nº 2 abrange do dia 5 ao dia 9, a janela nº 3 abrange do dia 10 ao dia 14 e assim por diante. Especificamente, a Janela #1 inclui todos os eventos com carimbos de data/hora começando em00:00:00.00no Dia 0 até (mas não incluindo) quaisquer eventos com carimbo de data/hora00:00:00.00no Dia 5. Cada evento pertence a exatamente uma janela.
Janela deslizante
Para recursos definidos usando janelas deslizantes, as agregações são calculadas em uma janela de comprimento fixo predeterminada que avança por um intervalo de deslizamento, produzindo janelas sobrepostas. Cada evento na fonte pode contribuir para a agregação de recursos em múltiplas janelas. recurso no tempo t agrega dados de janelas que terminam em ou antes de t (exclusivo). Windows teve início na época do Unix.
class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta
A tabela a seguir lista os parâmetros para uma janela deslizante.
Parâmetro | Restrições |
|---|---|
| Deve ser > 0 |
| Deve ser > 0 e < |
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta
window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)
Exemplo de janela deslizante
window_duration=timedelta(days=5), slide_duration=timedelta(days=1)Isso cria janelas sobrepostas de 5 dias que avançam 1 dia de cada vez. Exemplo: A janela nº 1 abrange do dia 0 ao dia 4, a janela nº 2 abrange do dia 1 ao dia 5, a janela nº 3 abrange do dia 2 ao dia 6 e assim por diante. Cada janela inclui eventos de00:00:00.00no dia de início até (mas não incluindo)00:00:00.00no dia de término. Como as janelas se sobrepõem, um único evento pode pertencer a várias janelas (neste exemplo, cada evento pertence a até 5 janelas diferentes).
Métodos da API
create_training_set()
join recurso with rótulo data for ML treinamento:
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude
# API continues to support creating training set using materialized feature tables and functions
) -> TrainingSet
Chame TrainingSet.load_df para obter os dados de treinamento originais combinados com o recurso de cálculo dinâmico pontual.
Requisitos para o argumento df :
- Deve conter todos os
entity_columnsdo recurso fonte de dados - Deve conter
timeseries_columndo recurso fonte de dados - Deve conter coluna(s) de rótulo
Correção pontual: os recursos são calculados apenas com os dados de origem disponíveis antes do carimbo de data/hora de cada linha, a fim de evitar vazamento futuro de dados no treinamento do modelo. Os cálculos aproveitam as funções de janelamento do Spark para maior eficiência.
log_model()
registrar um modelo com metadados de recursos para acompanhamento de linhagem e busca automática de recursos durante a inferência:
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)
O parâmetro flavor especifica o módulo de sabor do modelo MLflow a ser usado, como mlflow.sklearn ou mlflow.xgboost.
Os registros de modelos com um TrainingSet rastreiam automaticamente a linhagem até o recurso usado no treinamento. Para obter documentação detalhada, consulte Usar recurso para ensinar modelos.
score_batch()
Realizar inferência de lotes com busca automática de recursos:
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame
score_batch Utiliza os metadados de recurso armazenados com o modelo para compute automaticamente o recurso correto em um determinado momento para inferência, garantindo a consistência com o treinamento. Para obter documentação detalhada, consulte Usar recurso para ensinar modelos.
Melhores práticas
nomeação de recursos
- Use nomes descritivos para recursos essenciais para o negócio.
- Siga convenções de nomenclatura consistentes em todas as equipes.
- Deixe que a geração automática lide com os recursos exploratórios.
Janelas de tempo
- Utilize deslocamentos para excluir dados recentes instáveis.
- Alinhe os limites das janelas com os ciclos de negócios (diário, semanal).
- Considere as compensações entre a atualização dos dados e a estabilidade dos recursos.
desempenho
- Agrupe recurso por fonte de dados para minimizar varreduras de dados.
- Utilize tamanhos de janela adequados ao seu caso de uso.
Testando
- Teste os limites da janela de tempo com cenários de dados conhecidos.
Padrões comuns
Análise de clientes
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=1))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=90))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["amount"],
function=Sum(), time_window=ContinuousWindow(window_duration=timedelta(days=30)))
]
Análise de tendências
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7))
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7), offset=timedelta(days=-7))
)
Padrões sazonais
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=1), offset=timedelta(weeks=-4))
)
Limitações
- Os nomes das colunas de entidade e séries temporais devem corresponder entre o dataset de treinamento (rótulo) e as tabelas de origem quando usados na API
create_training_set. - O nome da coluna usada como coluna
labelno dataset de treinamento não deve existir nas tabelas de origem usadas para definirFeatures. - Uma lista limitada de funções (UDAFs) é suportada na API
create_feature.