Referência API de recurso declarativo
Beta
Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.
APIde recurso de engenharia declarativa
Feature construtor e register_feature()
A abordagem recomendada é construir um objeto Feature localmente e usar register_feature para persistir no Unity Catalog. Este fluxo de trabalho em duas etapas permite que você experimente recursos (incluindo create_training_set) antes de registrá-los.
Feature(
source: DataSource, # Required: DeltaTableSource or RequestSource
function: Union[AggregationFunction, ColumnSelection], # Required: Aggregation or column selection
entity: Optional[List[str]] = None, # Required for aggregation: entity columns
timeseries_column: Optional[str] = None, # Required for aggregation: timestamp column
name: Optional[str] = None, # Optional: Feature name (auto-generated if omitted)
description: Optional[str] = None, # Optional: Feature description
)
FeatureEngineeringClient.register_feature() registra um Feature construído localmente no Unity Catalog.
FeatureEngineeringClient.register_feature(
feature: Feature, # Required: A Feature instance (not already registered)
catalog_name: str, # Required: UC catalog name
schema_name: str, # Required: UC schema name
) -> Feature
from databricks.feature_engineering.entities import Feature, DeltaTableSource, AggregationFunction, Sum, RollingWindow
from datetime import timedelta
# Step 1: Construct the feature locally
feature = Feature(
source=DeltaTableSource(catalog_name="main", schema_name="store", table_name="transactions"),
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
# Step 2: Register in Unity Catalog
fe = FeatureEngineeringClient()
registered_feature = fe.register_feature(
feature=feature,
catalog_name="main",
schema_name="store",
)
create_feature()
FeatureEngineeringClient.create_feature() Valida, constrói e registra imediatamente um recurso no Unity Catalog em uma única etapa. Use esta opção quando não precisar testar o recurso localmente primeiro.
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource or RequestSource
function: Union[AggregationFunction, ColumnSelection], # Required: Aggregation or column selection
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
entity: Optional[List[str]] = None, # Required for aggregation: entity columns
timeseries_column: Optional[str] = None, # Required for aggregation: timestamp column
name: Optional[str] = None, # Optional: Feature name (auto-generated if omitted)
description: Optional[str] = None, # Optional: Feature description
) -> Feature
Parâmetros:
source: A fonte de dados usada na computação de recursos (DeltaTableSourceouRequestSource).function: UmAggregationFunctionque agrupa o operador (por exemplo,Sum(input="amount")), a coluna de entrada e a janela de tempo. OuColumnSelection("column_name")para recurso pass-through.catalog_name: O nome do recurso no catálogo Unity Catalog .schema_name: O nome do esquema do Unity Catalog para o recurso.entityLista de nomes de colunas que definem o nível de agregação (chave primária). Requerido para recurso de agregação. Por exemplo,["user_id"]agrega por usuário.timeseries_column: A coluna de carimbo de data/hora usada para agregação de janelas de tempo. Requerido para recurso de agregação.name: Nome do recurso (opcional). Se omitido, será gerado automaticamente a partir da coluna de entrada, função e janela (por exemplo,amount_avg_rolling_7d).descriptionDescrição opcional do recurso.
Retorna: Uma instância de recurso validada
Gera um erro ValueError se alguma validação falhar.
delete_feature()
Exclui um recurso do Unity Catalog pelo seu nome totalmente qualificado.
FeatureEngineeringClient.delete_feature(
full_name: str, # Required: '<catalog>.<schema>.<feature_name>'
) -> None
fe.delete_feature(full_name="main.store.amount_sum_rolling_7d")
Antes de excluir um recurso, remova ou atualize quaisquer modelos ou especificações de recursos que o referenciem. Se o recurso já tiver sido materializado, exclua primeiro o recurso materializado. Consulte Como excluir um recurso materializado.
Nomes gerados automaticamente
Quando name é omitido, um nome é gerado automaticamente. Os nomes gerados seguem o padrão: {column}_{function}_{window}. Por exemplo:
price_avg_rolling_1h(Preço médio por hora)transaction_count_rolling_30d_1d(Contagem de transações dos últimos 30 dias com atraso de 1 dia a partir do registro de data e hora do evento)
Funções suportadas
Funções de agregação
As funções de agregação são envolvidas em um AggregationFunction juntamente com uma janela de tempo, conforme descrito em janelas de tempo. Cada função recebe um parâmetro input que especifica a coluna de origem a ser agregada.
Função | Descrição | Exemplo de caso de uso |
|---|---|---|
| Total de valores | Uso diário do aplicativo por usuário em minutos |
| Média dos valores | Valor médio da transação |
| Número de registros | Número de logins por usuário |
| Valor mínimo | Frequência cardíaca mais baixa registrada por um dispositivo vestível |
| Valor máximo | Maior valor de transação por sessão |
| desvio padrão da população | Variação diária do valor das transações entre todos os clientes |
| desvio padrão da amostra | Variabilidade das taxas de cliques em campanhas publicitárias |
| Variância populacional | Distribuição das leituras dos sensores de dispositivos IoT em uma fábrica |
| Variância da amostra | Distribuição das classificações de filmes em um grupo amostrado |
| Contagem aproximada de únicos | Contagem específica de itens comprados |
| Percentil aproximado | latência de resposta p95 |
| Primeiro valor | Registro de data e hora do primeiro login |
| Último valor | Valor da compra mais recente |
ColumnSelection (passagem direta)
ColumnSelection Seleciona uma única coluna de uma fonte sem aplicar qualquer agregação. Ele é envolvido diretamente no parâmetro function (não dentro de AggregationFunction). O tipo de retorno é inferido a partir do esquema de origem.
Função | Descrição | Exemplo de caso de uso |
|---|---|---|
| Último valor de uma coluna (sem agregação) | Categoria de fornecedor mais recente, passagem de um campo de solicitação |
ColumnSelection pode ser usado com qualquer fonte de dados:
DeltaTableSource: Retorna o valor mais recente por key de entidade por meio de uma join pontual (sem agregação de janela de retrospectiva).RequestSource: Passa o valor fornecido no momento da inferência (ou extraído do DataFrame de rótulo no momento do treinamento).
from databricks.feature_engineering.entities import (
ColumnSelection, DeltaTableSource, Feature, FieldDefinition,
RequestSource, ScalarDataType,
)
delta_source = DeltaTableSource(
catalog_name="main", schema_name="feature_store", table_name="transactions",
)
request_source = RequestSource(
schema=[
FieldDefinition(name="session_duration", data_type=ScalarDataType.DOUBLE),
]
)
# ColumnSelection from a Delta table
latest_amount = Feature(
source=delta_source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
name="latest_transaction_amount",
)
# ColumnSelection from a RequestSource
session_feature = Feature(
source=request_source,
function=ColumnSelection("session_duration"),
name="session_duration",
)
Exemplo: recurso de agregação e seleção de colunas
O exemplo a seguir mostra o recurso definido sobre a mesma fonte de dados.
from databricks.feature_engineering.entities import (
AggregationFunction, Feature, Sum, Avg, ApproxCountDistinct,
ColumnSelection, RollingWindow,
)
from datetime import timedelta
window = RollingWindow(window_duration=timedelta(days=7))
sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="event_time",
function=AggregationFunction(Sum(input="amount"), window),
)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="event_time",
function=AggregationFunction(Avg(input="amount"), window),
)
distinct_count = Feature(
source=source,
entity=["user_id"],
timeseries_column="event_time",
function=AggregationFunction(ApproxCountDistinct(input="product_id", relativeSD=0.01), window),
)
# Column selection (no aggregation, no time window)
latest_amount = Feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="event_time",
name="latest_amount",
)
recurso com condições de filtro
O parâmetro filter_condition permite filtrar linhas da tabela de origem antes de calcular as agregações. Esta função funciona como uma cláusula SQL WHERE que é aplicada antes do agrupamento e agregação de dados.
filter_condition filtra linhas antes da agregação, como uma cláusula SQL WHERE aplicada antes de GROUP BY. Isso não altera a granularidade, que é sempre definida por entity na definição do recurso.
Os filtros são úteis ao trabalhar com grandes tabelas de origem que incluem um superconjunto de dados necessários para o cálculo de recursos e minimizam a necessidade de criar visualizações separadas sobre essas tabelas.
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow, DeltaTableSource
from datetime import timedelta
# Source with filter applied at the source level
high_value_transactions = DeltaTableSource(
catalog_name="main",
schema_name="ecommerce",
table_name="transactions",
filter_condition="amount > 100", # Only transactions over $100
)
high_value_sales = Feature(
source=high_value_transactions,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30))),
)
# Multiple conditions
completed_orders_source = DeltaTableSource(
catalog_name="main",
schema_name="ecommerce",
table_name="orders",
filter_condition="status = 'completed' AND payment_method = 'credit_card'",
)
completed_orders = Feature(
source=completed_orders_source,
entity=["user_id"],
timeseries_column="order_time",
function=AggregationFunction(Count(input="order_id"), RollingWindow(window_duration=timedelta(days=7))),
)
fonte de dados
DeltaTableSource
DeltaTableSource é um objeto Python efêmero usado para definir como os recursos são calculados a partir de uma tabela de origem. Não cria uma nova tabela — especifica a configuração para leitura de dados e agregação de recursos.
DeltaTableSource(
catalog_name: str, # Required: Catalog name
schema_name: str, # Required: Schema name
table_name: str, # Required: Table name
filter_condition: Optional[str] = None, # Optional: SQL WHERE clause to filter source data
transformation_sql: Optional[str] = None, # Optional: SQL SELECT expression for column transformations
schema_json: Optional[str] = None, # Required if transformation_sql is set: schema of the resulting DataFrame
)
Parâmetros:
catalog_name,schema_name,table_name: Identifica a tabela Delta de origem no Unity Catalog.filter_conditionCláusula SQLWHEREaplicada antes da agregação. Exemplo:"status = 'completed'".transformation_sql: Uma expressão SQLSELECTaplicada à tabela de origem. Use isso para renomear colunas, converter tipos ou compute colunas derivadas antes da agregação. Se omitido, todas as colunas são selecionadas (*). Exemplo:"user_id, CAST(amount AS DOUBLE) AS amount, event_time".schema_json: O esquema do DataFrame resultante após as transformações, no formato JSON do Spark StructType (dedf.schema.json()). Obrigatório setransformation_sqlfor fornecido. Isso informa ao sistema os nomes e tipos de coluna resultantes das suas transformações.
Quando ambos filter_condition e transformation_sql estão definidos, a consulta resultante é: SELECT {transformation_sql} FROM {table} WHERE {filter_condition}.
O timeseries_column (especificado na definição do recurso, não em DeltaTableSource) deve ser do tipo TimestampType ou DateType. Tipos inteiros podem funcionar, mas causam perda de precisão para agregações de janelas de tempo.
Exemplo: Usando transformation_sql para transformações de coluna
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="raw_events",
transformation_sql="user_id, CAST(price_cents AS DOUBLE) / 100 AS price, event_time",
filter_condition="event_type = 'purchase'",
schema_json=spark.sql(
"SELECT user_id, CAST(price_cents AS DOUBLE) / 100 AS price, event_time FROM main.analytics.raw_events LIMIT 0"
).schema.json(),
)
Exemplo: Obtendo transformation_sql e schema_json de um DataFrame do PySpark
Você pode escrever suas transformações como uma consulta PySpark e, em seguida, extrair o esquema do DataFrame resultante:
df = spark.sql(f"""
SELECT user_id, CAST(amount AS DOUBLE) / 100 AS amount_dollars, event_time
FROM main.analytics.events
WHERE event_date >= date_sub(current_date(), 7)
LIMIT 0
""")
# Use df.schema.json() as the schema_json
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="events",
transformation_sql="user_id, CAST(amount AS DOUBLE) / 100 AS amount_dollars, event_time",
filter_condition="event_date >= date_sub(current_date(), 7)",
schema_json=df.schema.json(),
)
transformation_sql Suporta apenas expressões por linha (renomeações de colunas, conversões de tipo, operações aritméticas). Funções de agregação como COUNT(*) ou SUM() não são suportadas — use AggregationFunction na definição do recurso.
DeltaTableSource.from_sql()
Para sua conveniência, você pode criar um DeltaTableSource a partir de uma consulta SQL. O método analisa a consulta para extrair automaticamente o nome da tabela, transformation_sql e filter_condition.
DeltaTableSource.from_sql(
sql: str, # Required: SQL SELECT query
spark_client, # Required: Spark client (for schema inference)
) -> DeltaTableSource
Apenas consultas simples SELECT ... FROM ... [WHERE ...] são suportadas. SQL complexas (junções, subconsultas, CTEs, UNIONs) são rejeitadas. Para consultas complexas, construa DeltaTableSource diretamente com transformation_sql e filter_condition.
from databricks.feature_engineering.entities import (
AggregationFunction,
DeltaTableSource,
Feature,
Sum,
TumblingWindow,
)
from databricks.ml_features._spark_client._spark_client import SparkClient
spark_client = SparkClient()
source = DeltaTableSource.from_sql(
spark_client=spark_client,
sql=f"SELECT customer_id, event_ts, amount * 2 AS doubled_amount, amount FROM {CATALOG}.{SCHEMA}.{TABLE}",
)
feature = Feature(
source=source,
function=AggregationFunction(Sum(input="doubled_amount"), time_window=TumblingWindow(window_duration=timedelta(days=7))),
entity=["customer_id"], timeseries_column="event_ts",
)
Iterar com to_dataframe()
Use source.to_dataframe() para pré-visualizar os dados que serão usados para o cálculo do recurso. Isso é útil para iterar em filter_condition e transformation_sql até que produzam os resultados esperados.
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="events",
filter_condition="event_type = 'purchase'",
)
# Preview the filtered source data
source.to_dataframe().display()
Compreendendo entidades
As colunas de entidade definem o nível de agregação do seu recurso. Eles são especificados na definição Feature , não em DeltaTableSource. As entidades determinam:
- Como os dados são agrupados : os recursos são agregados por combinação única de valores de entidade (semelhante a
GROUP BYem SQL). - A estrutura key primária : Cada combinação única de entidades resulta em uma linha de recurso computacional.
Exemplo: recurso de nível de cliente
O código a seguir agrega recursos no nível do cliente (uma linha por cliente):
from databricks.feature_engineering.entities import DeltaTableSource
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="user_events",
)
Feature(
source=source,
entity=["user_id"], # Features aggregated per user
timeseries_column="event_time", # Timestamp for time windows
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
Exemplo: recurso em nível de loja do cliente
Para agregar recursos em um nível mais detalhado (uma linha por combinação cliente-loja), use várias colunas de entidade:
source = DeltaTableSource(
catalog_name="main",
schema_name="retail",
table_name="transactions",
)
Feature(
source=source,
entity=["user_id", "store_id"], # Features aggregated per user-store pair
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
Quando você precisar de recursos em diferentes níveis de agregação (por exemplo, nível de cliente e nível de cliente-loja), use valores entity diferentes em suas definições de recursos. O mesmo DeltaTableSource pode ser compartilhado entre recursos com diferentes configurações de entidade.
RequestSource
RequestSource Define um esquema para dados que são fornecidos no momento da inferência na carga útil da solicitação, em vez de serem consultados em uma tabela pré-materializada. Durante o treinamento, essas colunas são extraídas do DataFrame de rótulo passado para create_training_set. Durante o serviço do modelo, o solicitante deve incluí-los na carga útil da solicitação HTTP.
RequestSource é usado com ColumnSelection (para passar um valor diretamente). Não suporta funções de agregação nem janelas de tempo.
Definindo o esquema
Defina o esquema como uma lista de objetos FieldDefinition , cada um especificando um nome de coluna e um ScalarDataType:
from databricks.feature_engineering.entities import (
FieldDefinition, RequestSource, ScalarDataType,
)
request_source = RequestSource(
schema=[
FieldDefinition(name="transaction_amount", data_type=ScalarDataType.DOUBLE),
FieldDefinition(name="vendor_id", data_type=ScalarDataType.STRING),
FieldDefinition(name="transaction_id", data_type=ScalarDataType.STRING),
FieldDefinition(name="transaction_time", data_type=ScalarDataType.DATE),
]
)
Tipos de dados suportados
RequestSource suporta os tipos escalares definidos em ScalarDataType: INTEGER, FLOAT, BOOLEAN, STRING, DOUBLE, LONG, TIMESTAMP, DATE, SHORT. Tipos complexos como arrays, mapas e structs não são suportados.
Como os dados da solicitação são hidratados
Contexto | Comportamento |
|---|---|
treinamento ( | As colunas são extraídas do DataFrame rótulo. Os tipos são validados em relação ao esquema declarado — incompatibilidades geram um erro (sem conversão implícita). |
Servindo ( endpoint do modelo) | As colunas são extraídas de |
Assinatura do modelo
Quando um modelo é registrado usando log_model com um conjunto de treinamento que inclui RequestSource recurso, as colunas RequestSource são adicionadas à assinatura do modelo MLflow como entradas obrigatórias. Isso significa que o esquema API do endpoint de serviço reflete quais campos os chamadores devem fornecer no momento da inferência.
API de treinamento e inferência
create_training_set()
Cria um dataset de treinamento com computação de recursos correta para um determinado ponto no tempo. Para detalhes, veja ensinar modelos com recurso declarativo.
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude
) -> TrainingSet
log_model()
registra um modelo com metadados de recursos para acompanhamento de linhagem e busca automática de recursos durante a inferência. Para detalhes, veja ensinar modelos com recurso declarativo.
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)
score_batch()
Realiza inferência de lotes offline com busca automática de recursos. Utiliza os metadados de recurso armazenados com o modelo para compute o recurso correto em um determinado momento, garantindo a consistência com o treinamento.
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model (e.g., "models:/catalog.schema.model/1")
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame
O DataFrame de entrada deve conter as colunas de entidade e séries temporais usadas durante o treinamento. Os recursos são calculados automaticamente a partir dos dados de origem.
fe = FeatureEngineeringClient()
# Batch scoring with automatic feature lookup
predictions = fe.score_batch(
model_uri="models:/main.ecommerce.fraud_model/1",
df=inference_df,
)
predictions.display()
Janelas de tempo
APIs declarativas de engenharia de recursos suportam três tipos diferentes de janelas para controlar o comportamento de retrocesso em agregações baseadas em janelas de tempo: rolling, tumbling e sliding.
- As janelas deslizantes permitem observar o momento do evento. A duração e o atraso são definidos explicitamente.
- Janelas móveis são janelas de tempo fixas e não sobrepostas. Cada ponto de dados pertence a exatamente uma janela.
- As janelas deslizantes são janelas de tempo sobrepostas e rolantes com um intervalo de deslizamento configurável.
A ilustração a seguir mostra como eles funcionam.

Janela deslizante
RollingWindow anteriormente era chamado de ContinuousWindow. Se você estiver migrando de uma versão anterior do SDK, atualize suas importações de acordo.
As janelas deslizantes são agregações atualizadas e em tempo real, normalmente usadas em dados de transmissão. Em um pipeline de transmissão, a janela deslizante emite uma nova linha somente quando o conteúdo da janela de comprimento fixo muda, como quando um evento entra ou sai. Quando um recurso de janela deslizante é usado no pipeline de treinamento, um cálculo preciso do recurso em um ponto específico no tempo é realizado na fonte de dados, usando a duração da janela de comprimento fixo imediatamente anterior ao carimbo de data/hora de um evento específico. Isso ajuda a evitar distorções entre os modos online e offline ou vazamento de dados. recurso no horário T eventos agregados de [T − duração, T).
class RollingWindow(TimeWindow):
window_duration: datetime.timedelta
delay: Optional[datetime.timedelta] = None
A tabela a seguir lista os parâmetros para uma janela deslizante. Os horários de início e término da janela são baseados nos seguintes parâmetros:
- horário de início:
evaluation_time - window_duration - delay(inclusive) - Hora de término:
evaluation_time - delay(exclusivo)
Parâmetro | Restrições |
|---|---|
| Deve ser ≥ 0 (desloca a janela para trás no tempo a partir do carimbo de data/hora da avaliação). Use |
| Deve ser > 0 |
from databricks.feature_engineering.entities import RollingWindow
from datetime import timedelta
# Look back 7 days from evaluation time
window = RollingWindow(window_duration=timedelta(days=7))
Defina uma janela deslizante com atraso usando o código abaixo.
# Look back 7 days, offset by 1 minute to account for data ingestion delay
window = RollingWindow(
window_duration=timedelta(days=7),
delay=timedelta(minutes=1)
)
Exemplos de janelas deslizantes
-
window_duration=timedelta(days=7)Isso cria uma janela de análise retrospectiva de 7 dias, terminando no momento da avaliação atual. Para um evento às 14h do Dia 7, isso inclui todos os eventos desde as 14h do Dia 0 até (mas não incluindo) as 14h do Dia 7. -
window_duration=timedelta(hours=1), delay=timedelta(minutes=30)Isso cria uma janela de retrospectiva de 1 hora, terminando 30 minutos antes do horário de avaliação. Para um evento às 15h, isso inclui todos os eventos das 13h30 até (mas não incluindo) 14h30. Isso é útil para account atrasos na ingestão de dados.
Janela basculante
Para recursos definidos usando janelas deslizantes, as agregações são calculadas em uma janela de comprimento fixo predeterminado que avança por um intervalo de deslizamento, produzindo janelas não sobrepostas que particionam completamente o tempo. Consequentemente, cada evento na fonte contribui para exatamente uma janela. recurso no tempo t agrega dados de janelas que terminam em ou antes de t (exclusivo). Windows teve início na época do Unix.
class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta
A tabela a seguir lista os parâmetros para uma janela basculante.
Parâmetro | Restrições |
|---|---|
| Deve ser > 0 |
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta
window = TumblingWindow(
window_duration=timedelta(days=7)
)
Exemplo de janela basculante
window_duration=timedelta(days=5)Isso cria janelas de tempo predefinidas com duração fixa de 5 dias cada. Exemplo: A janela nº 1 abrange do dia 0 ao dia 4, a janela nº 2 abrange do dia 5 ao dia 9, a janela nº 3 abrange do dia 10 ao dia 14 e assim por diante. Especificamente, a Janela #1 inclui todos os eventos com carimbos de data/hora começando em00:00:00.00no Dia 0 até (mas não incluindo) quaisquer eventos com carimbo de data/hora00:00:00.00no Dia 5. Cada evento pertence a exatamente uma janela.
Janela deslizante
Para recursos definidos usando janelas deslizantes, as agregações são calculadas em uma janela de comprimento fixo predeterminada que avança por um intervalo de deslizamento, produzindo janelas sobrepostas. Cada evento na fonte pode contribuir para a agregação de recursos em múltiplas janelas. recurso no tempo t agrega dados de janelas que terminam em ou antes de t (exclusivo). Windows teve início na época do Unix.
class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta
A tabela a seguir lista os parâmetros para uma janela deslizante.
Parâmetro | Restrições |
|---|---|
| Deve ser > 0 |
| Deve ser > 0 e < |
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta
window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)
Exemplo de janela deslizante
window_duration=timedelta(days=5), slide_duration=timedelta(days=1)Isso cria janelas sobrepostas de 5 dias que avançam 1 dia de cada vez. Exemplo: A janela nº 1 abrange do dia 0 ao dia 4, a janela nº 2 abrange do dia 1 ao dia 5, a janela nº 3 abrange do dia 2 ao dia 6 e assim por diante. Cada janela inclui eventos de00:00:00.00no dia de início até (mas não incluindo)00:00:00.00no dia de término. Como as janelas se sobrepõem, um único evento pode pertencer a várias janelas (neste exemplo, cada evento pertence a até 5 janelas diferentes).
Gatilhos de materialização
Aciona o controle quando um pipeline de materialização é executado. O tipo de gatilho depende do tipo de recurso.
CronSchedule
Use CronSchedule para recurso de agregação (AggregationFunction). A execução pipeline em um programar fixo definido por uma expressão cron do Quartz.
from databricks.feature_engineering.entities import CronSchedule
from databricks.sdk.service.ml import MaterializedFeaturePipelineScheduleState
trigger = CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
)
TableTrigger
Use TableTrigger para recurso ColumnSelection apoiado por um DeltaTableSource. A execução pipeline sempre que a tabela Delta upstream recebe um novo commit.
from databricks.feature_engineering.entities import TableTrigger
trigger = TableTrigger()
Escolhendo um gatilho
Tipo de recurso | Trigger | Quando se trata de execução |
|---|---|---|
Agregação ( |
| Em um programador cron fixo |
|
| Em cada commit da tabela de origem |
Não é possível misturar ColumnSelection e recurso de agregação em uma única chamada materialize_features porque eles exigem tipos de gatilho diferentes. Em vez disso, faça chamadas separadas.