Pular para o conteúdo principal

Referência API de recurso declarativo

info

Beta

Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.

APIde recurso de engenharia declarativa

Feature construtor e register_feature()

A abordagem recomendada é construir um objeto Feature localmente e usar register_feature para persistir no Unity Catalog. Este fluxo de trabalho em duas etapas permite que você experimente recursos (incluindo create_training_set) antes de registrá-los.

Python
Feature(
source: DataSource, # Required: DeltaTableSource or RequestSource
function: Union[AggregationFunction, ColumnSelection], # Required: Aggregation or column selection
entity: Optional[List[str]] = None, # Required for aggregation: entity columns
timeseries_column: Optional[str] = None, # Required for aggregation: timestamp column
name: Optional[str] = None, # Optional: Feature name (auto-generated if omitted)
description: Optional[str] = None, # Optional: Feature description
)

FeatureEngineeringClient.register_feature() registra um Feature construído localmente no Unity Catalog.

Python
FeatureEngineeringClient.register_feature(
feature: Feature, # Required: A Feature instance (not already registered)
catalog_name: str, # Required: UC catalog name
schema_name: str, # Required: UC schema name
) -> Feature
Python
from databricks.feature_engineering.entities import Feature, DeltaTableSource, AggregationFunction, Sum, RollingWindow
from datetime import timedelta

# Step 1: Construct the feature locally
feature = Feature(
source=DeltaTableSource(catalog_name="main", schema_name="store", table_name="transactions"),
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)

# Step 2: Register in Unity Catalog
fe = FeatureEngineeringClient()
registered_feature = fe.register_feature(
feature=feature,
catalog_name="main",
schema_name="store",
)

create_feature()

FeatureEngineeringClient.create_feature() Valida, constrói e registra imediatamente um recurso no Unity Catalog em uma única etapa. Use esta opção quando não precisar testar o recurso localmente primeiro.

Python
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource or RequestSource
function: Union[AggregationFunction, ColumnSelection], # Required: Aggregation or column selection
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
entity: Optional[List[str]] = None, # Required for aggregation: entity columns
timeseries_column: Optional[str] = None, # Required for aggregation: timestamp column
name: Optional[str] = None, # Optional: Feature name (auto-generated if omitted)
description: Optional[str] = None, # Optional: Feature description
) -> Feature

Parâmetros:

  • source: A fonte de dados usada na computação de recursos (DeltaTableSource ou RequestSource).
  • function: Um AggregationFunction que agrupa o operador (por exemplo, Sum(input="amount")), a coluna de entrada e a janela de tempo. Ou ColumnSelection("column_name") para recurso pass-through.
  • catalog_name: O nome do recurso no catálogo Unity Catalog .
  • schema_name: O nome do esquema do Unity Catalog para o recurso.
  • entityLista de nomes de colunas que definem o nível de agregação (chave primária). Requerido para recurso de agregação. Por exemplo, ["user_id"] agrega por usuário.
  • timeseries_column: A coluna de carimbo de data/hora usada para agregação de janelas de tempo. Requerido para recurso de agregação.
  • name: Nome do recurso (opcional). Se omitido, será gerado automaticamente a partir da coluna de entrada, função e janela (por exemplo, amount_avg_rolling_7d).
  • descriptionDescrição opcional do recurso.

Retorna: Uma instância de recurso validada

Gera um erro ValueError se alguma validação falhar.

delete_feature()

Exclui um recurso do Unity Catalog pelo seu nome totalmente qualificado.

Python
FeatureEngineeringClient.delete_feature(
full_name: str, # Required: '<catalog>.<schema>.<feature_name>'
) -> None
Python
fe.delete_feature(full_name="main.store.amount_sum_rolling_7d")

Antes de excluir um recurso, remova ou atualize quaisquer modelos ou especificações de recursos que o referenciem. Se o recurso já tiver sido materializado, exclua primeiro o recurso materializado. Consulte Como excluir um recurso materializado.

Nomes gerados automaticamente

Quando name é omitido, um nome é gerado automaticamente. Os nomes gerados seguem o padrão: {column}_{function}_{window}. Por exemplo:

  • price_avg_rolling_1h (Preço médio por hora)
  • transaction_count_rolling_30d_1d (Contagem de transações dos últimos 30 dias com atraso de 1 dia a partir do registro de data e hora do evento)

Funções suportadas

Funções de agregação

nota

As funções de agregação são envolvidas em um AggregationFunction juntamente com uma janela de tempo, conforme descrito em janelas de tempo. Cada função recebe um parâmetro input que especifica a coluna de origem a ser agregada.

Função

Descrição

Exemplo de caso de uso

Sum(input="column")

Total de valores

Uso diário do aplicativo por usuário em minutos

Avg(input="column")

Média dos valores

Valor médio da transação

Count(input="column")

Número de registros

Número de logins por usuário

Min(input="column")

Valor mínimo

Frequência cardíaca mais baixa registrada por um dispositivo vestível

Max(input="column")

Valor máximo

Maior valor de transação por sessão

StddevPop(input="column")

desvio padrão da população

Variação diária do valor das transações entre todos os clientes

StddevSamp(input="column")

desvio padrão da amostra

Variabilidade das taxas de cliques em campanhas publicitárias

VarPop(input="column")

Variância populacional

Distribuição das leituras dos sensores de dispositivos IoT em uma fábrica

VarSamp(input="column")

Variância da amostra

Distribuição das classificações de filmes em um grupo amostrado

ApproxCountDistinct(input="column", relativeSD=0.05)

Contagem aproximada de únicos

Contagem específica de itens comprados

ApproxPercentile(input="column", percentile=0.95, accuracy=100)

Percentil aproximado

latência de resposta p95

First(input="column")

Primeiro valor

Registro de data e hora do primeiro login

Last(input="column")

Último valor

Valor da compra mais recente

ColumnSelection (passagem direta)

ColumnSelection Seleciona uma única coluna de uma fonte sem aplicar qualquer agregação. Ele é envolvido diretamente no parâmetro function (não dentro de AggregationFunction). O tipo de retorno é inferido a partir do esquema de origem.

Função

Descrição

Exemplo de caso de uso

ColumnSelection("col")

Último valor de uma coluna (sem agregação)

Categoria de fornecedor mais recente, passagem de um campo de solicitação

ColumnSelection pode ser usado com qualquer fonte de dados:

  • DeltaTableSource : Retorna o valor mais recente por key de entidade por meio de uma join pontual (sem agregação de janela de retrospectiva).
  • RequestSource : Passa o valor fornecido no momento da inferência (ou extraído do DataFrame de rótulo no momento do treinamento).
Python
from databricks.feature_engineering.entities import (
ColumnSelection, DeltaTableSource, Feature, FieldDefinition,
RequestSource, ScalarDataType,
)

delta_source = DeltaTableSource(
catalog_name="main", schema_name="feature_store", table_name="transactions",
)

request_source = RequestSource(
schema=[
FieldDefinition(name="session_duration", data_type=ScalarDataType.DOUBLE),
]
)

# ColumnSelection from a Delta table
latest_amount = Feature(
source=delta_source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
name="latest_transaction_amount",
)

# ColumnSelection from a RequestSource
session_feature = Feature(
source=request_source,
function=ColumnSelection("session_duration"),
name="session_duration",
)

Exemplo: recurso de agregação e seleção de colunas

O exemplo a seguir mostra o recurso definido sobre a mesma fonte de dados.

Python
from databricks.feature_engineering.entities import (
AggregationFunction, Feature, Sum, Avg, ApproxCountDistinct,
ColumnSelection, RollingWindow,
)
from datetime import timedelta

window = RollingWindow(window_duration=timedelta(days=7))

sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="event_time",
function=AggregationFunction(Sum(input="amount"), window),
)

avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="event_time",
function=AggregationFunction(Avg(input="amount"), window),
)

distinct_count = Feature(
source=source,
entity=["user_id"],
timeseries_column="event_time",
function=AggregationFunction(ApproxCountDistinct(input="product_id", relativeSD=0.01), window),
)

# Column selection (no aggregation, no time window)
latest_amount = Feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="event_time",
name="latest_amount",
)

recurso com condições de filtro

O parâmetro filter_condition permite filtrar linhas da tabela de origem antes de calcular as agregações. Esta função funciona como uma cláusula SQL WHERE que é aplicada antes do agrupamento e agregação de dados.

nota

filter_condition filtra linhas antes da agregação, como uma cláusula SQL WHERE aplicada antes de GROUP BY. Isso não altera a granularidade, que é sempre definida por entity na definição do recurso.

Os filtros são úteis ao trabalhar com grandes tabelas de origem que incluem um superconjunto de dados necessários para o cálculo de recursos e minimizam a necessidade de criar visualizações separadas sobre essas tabelas.

Python
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow, DeltaTableSource
from datetime import timedelta

# Source with filter applied at the source level
high_value_transactions = DeltaTableSource(
catalog_name="main",
schema_name="ecommerce",
table_name="transactions",
filter_condition="amount > 100", # Only transactions over $100
)

high_value_sales = Feature(
source=high_value_transactions,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30))),
)

# Multiple conditions
completed_orders_source = DeltaTableSource(
catalog_name="main",
schema_name="ecommerce",
table_name="orders",
filter_condition="status = 'completed' AND payment_method = 'credit_card'",
)

completed_orders = Feature(
source=completed_orders_source,
entity=["user_id"],
timeseries_column="order_time",
function=AggregationFunction(Count(input="order_id"), RollingWindow(window_duration=timedelta(days=7))),
)

fonte de dados

DeltaTableSource

DeltaTableSource é um objeto Python efêmero usado para definir como os recursos são calculados a partir de uma tabela de origem. Não cria uma nova tabela — especifica a configuração para leitura de dados e agregação de recursos.

Python
DeltaTableSource(
catalog_name: str, # Required: Catalog name
schema_name: str, # Required: Schema name
table_name: str, # Required: Table name
filter_condition: Optional[str] = None, # Optional: SQL WHERE clause to filter source data
transformation_sql: Optional[str] = None, # Optional: SQL SELECT expression for column transformations
schema_json: Optional[str] = None, # Required if transformation_sql is set: schema of the resulting DataFrame
)

Parâmetros:

  • catalog_name, schema_name, table_name: Identifica a tabela Delta de origem no Unity Catalog.
  • filter_conditionCláusula SQL WHERE aplicada antes da agregação. Exemplo: "status = 'completed'".
  • transformation_sql: Uma expressão SQL SELECT aplicada à tabela de origem. Use isso para renomear colunas, converter tipos ou compute colunas derivadas antes da agregação. Se omitido, todas as colunas são selecionadas (*). Exemplo: "user_id, CAST(amount AS DOUBLE) AS amount, event_time".
  • schema_json: O esquema do DataFrame resultante após as transformações, no formato JSON do Spark StructType (de df.schema.json()). Obrigatório se transformation_sql for fornecido. Isso informa ao sistema os nomes e tipos de coluna resultantes das suas transformações.

Quando ambos filter_condition e transformation_sql estão definidos, a consulta resultante é: SELECT {transformation_sql} FROM {table} WHERE {filter_condition}.

nota

O timeseries_column (especificado na definição do recurso, não em DeltaTableSource) deve ser do tipo TimestampType ou DateType. Tipos inteiros podem funcionar, mas causam perda de precisão para agregações de janelas de tempo.

Exemplo: Usando transformation_sql para transformações de coluna

Python
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="raw_events",
transformation_sql="user_id, CAST(price_cents AS DOUBLE) / 100 AS price, event_time",
filter_condition="event_type = 'purchase'",
schema_json=spark.sql(
"SELECT user_id, CAST(price_cents AS DOUBLE) / 100 AS price, event_time FROM main.analytics.raw_events LIMIT 0"
).schema.json(),
)

Exemplo: Obtendo transformation_sql e schema_json de um DataFrame do PySpark

Você pode escrever suas transformações como uma consulta PySpark e, em seguida, extrair o esquema do DataFrame resultante:

Python
df = spark.sql(f"""
SELECT user_id, CAST(amount AS DOUBLE) / 100 AS amount_dollars, event_time
FROM main.analytics.events
WHERE event_date >= date_sub(current_date(), 7)
LIMIT 0
""")

# Use df.schema.json() as the schema_json
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="events",
transformation_sql="user_id, CAST(amount AS DOUBLE) / 100 AS amount_dollars, event_time",
filter_condition="event_date >= date_sub(current_date(), 7)",
schema_json=df.schema.json(),
)
nota

transformation_sql Suporta apenas expressões por linha (renomeações de colunas, conversões de tipo, operações aritméticas). Funções de agregação como COUNT(*) ou SUM() não são suportadas — use AggregationFunction na definição do recurso.

DeltaTableSource.from_sql()

Para sua conveniência, você pode criar um DeltaTableSource a partir de uma consulta SQL. O método analisa a consulta para extrair automaticamente o nome da tabela, transformation_sql e filter_condition.

Python
DeltaTableSource.from_sql(
sql: str, # Required: SQL SELECT query
spark_client, # Required: Spark client (for schema inference)
) -> DeltaTableSource

Apenas consultas simples SELECT ... FROM ... [WHERE ...] são suportadas. SQL complexas (junções, subconsultas, CTEs, UNIONs) são rejeitadas. Para consultas complexas, construa DeltaTableSource diretamente com transformation_sql e filter_condition.

Python
from databricks.feature_engineering.entities import (
AggregationFunction,
DeltaTableSource,
Feature,
Sum,
TumblingWindow,
)
from databricks.ml_features._spark_client._spark_client import SparkClient

spark_client = SparkClient()
source = DeltaTableSource.from_sql(
spark_client=spark_client,
sql=f"SELECT customer_id, event_ts, amount * 2 AS doubled_amount, amount FROM {CATALOG}.{SCHEMA}.{TABLE}",
)

feature = Feature(
source=source,
function=AggregationFunction(Sum(input="doubled_amount"), time_window=TumblingWindow(window_duration=timedelta(days=7))),
entity=["customer_id"], timeseries_column="event_ts",
)

Iterar com to_dataframe()

Use source.to_dataframe() para pré-visualizar os dados que serão usados para o cálculo do recurso. Isso é útil para iterar em filter_condition e transformation_sql até que produzam os resultados esperados.

Python
source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="events",
filter_condition="event_type = 'purchase'",
)

# Preview the filtered source data
source.to_dataframe().display()

Compreendendo entidades

As colunas de entidade definem o nível de agregação do seu recurso. Eles são especificados na definição Feature , não em DeltaTableSource. As entidades determinam:

  • Como os dados são agrupados : os recursos são agregados por combinação única de valores de entidade (semelhante a GROUP BY em SQL).
  • A estrutura key primária : Cada combinação única de entidades resulta em uma linha de recurso computacional.

Exemplo: recurso de nível de cliente

O código a seguir agrega recursos no nível do cliente (uma linha por cliente):

Python
from databricks.feature_engineering.entities import DeltaTableSource

source = DeltaTableSource(
catalog_name="main",
schema_name="analytics",
table_name="user_events",
)

Feature(
source=source,
entity=["user_id"], # Features aggregated per user
timeseries_column="event_time", # Timestamp for time windows
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)

Exemplo: recurso em nível de loja do cliente

Para agregar recursos em um nível mais detalhado (uma linha por combinação cliente-loja), use várias colunas de entidade:

Python
source = DeltaTableSource(
catalog_name="main",
schema_name="retail",
table_name="transactions",
)

Feature(
source=source,
entity=["user_id", "store_id"], # Features aggregated per user-store pair
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)

Quando você precisar de recursos em diferentes níveis de agregação (por exemplo, nível de cliente e nível de cliente-loja), use valores entity diferentes em suas definições de recursos. O mesmo DeltaTableSource pode ser compartilhado entre recursos com diferentes configurações de entidade.

RequestSource

RequestSource Define um esquema para dados que são fornecidos no momento da inferência na carga útil da solicitação, em vez de serem consultados em uma tabela pré-materializada. Durante o treinamento, essas colunas são extraídas do DataFrame de rótulo passado para create_training_set. Durante o serviço do modelo, o solicitante deve incluí-los na carga útil da solicitação HTTP.

RequestSource é usado com ColumnSelection (para passar um valor diretamente). Não suporta funções de agregação nem janelas de tempo.

Definindo o esquema

Defina o esquema como uma lista de objetos FieldDefinition , cada um especificando um nome de coluna e um ScalarDataType:

Python
from databricks.feature_engineering.entities import (
FieldDefinition, RequestSource, ScalarDataType,
)

request_source = RequestSource(
schema=[
FieldDefinition(name="transaction_amount", data_type=ScalarDataType.DOUBLE),
FieldDefinition(name="vendor_id", data_type=ScalarDataType.STRING),
FieldDefinition(name="transaction_id", data_type=ScalarDataType.STRING),
FieldDefinition(name="transaction_time", data_type=ScalarDataType.DATE),
]
)

Tipos de dados suportados

RequestSource suporta os tipos escalares definidos em ScalarDataType: INTEGER, FLOAT, BOOLEAN, STRING, DOUBLE, LONG, TIMESTAMP, DATE, SHORT. Tipos complexos como arrays, mapas e structs não são suportados.

Como os dados da solicitação são hidratados

Contexto

Comportamento

treinamento (create_training_set)

As colunas são extraídas do DataFrame rótulo. Os tipos são validados em relação ao esquema declarado — incompatibilidades geram um erro (sem conversão implícita).

Servindo ( endpoint do modelo)

As colunas são extraídas de dataframe_records ou dataframe_split na solicitação HTTP. Os valores JSON são convertidos para os tipos declarados (por exemplo, número JSON → DOUBLE).

Assinatura do modelo

Quando um modelo é registrado usando log_model com um conjunto de treinamento que inclui RequestSource recurso, as colunas RequestSource são adicionadas à assinatura do modelo MLflow como entradas obrigatórias. Isso significa que o esquema API do endpoint de serviço reflete quais campos os chamadores devem fornecer no momento da inferência.

API de treinamento e inferência

create_training_set()

Cria um dataset de treinamento com computação de recursos correta para um determinado ponto no tempo. Para detalhes, veja ensinar modelos com recurso declarativo.

Python
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude
) -> TrainingSet

log_model()

registra um modelo com metadados de recursos para acompanhamento de linhagem e busca automática de recursos durante a inferência. Para detalhes, veja ensinar modelos com recurso declarativo.

Python
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)

score_batch()

Realiza inferência de lotes offline com busca automática de recursos. Utiliza os metadados de recurso armazenados com o modelo para compute o recurso correto em um determinado momento, garantindo a consistência com o treinamento.

Python
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model (e.g., "models:/catalog.schema.model/1")
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame

O DataFrame de entrada deve conter as colunas de entidade e séries temporais usadas durante o treinamento. Os recursos são calculados automaticamente a partir dos dados de origem.

Python
fe = FeatureEngineeringClient()

# Batch scoring with automatic feature lookup
predictions = fe.score_batch(
model_uri="models:/main.ecommerce.fraud_model/1",
df=inference_df,
)
predictions.display()

Janelas de tempo

APIs declarativas de engenharia de recursos suportam três tipos diferentes de janelas para controlar o comportamento de retrocesso em agregações baseadas em janelas de tempo: rolling, tumbling e sliding.

  • As janelas deslizantes permitem observar o momento do evento. A duração e o atraso são definidos explicitamente.
  • Janelas móveis são janelas de tempo fixas e não sobrepostas. Cada ponto de dados pertence a exatamente uma janela.
  • As janelas deslizantes são janelas de tempo sobrepostas e rolantes com um intervalo de deslizamento configurável.

A ilustração a seguir mostra como eles funcionam.

Janelas basculantes, giratórias e deslizantes para observação.

Janela deslizante

nota

RollingWindow anteriormente era chamado de ContinuousWindow. Se você estiver migrando de uma versão anterior do SDK, atualize suas importações de acordo.

As janelas deslizantes são agregações atualizadas e em tempo real, normalmente usadas em dados de transmissão. Em um pipeline de transmissão, a janela deslizante emite uma nova linha somente quando o conteúdo da janela de comprimento fixo muda, como quando um evento entra ou sai. Quando um recurso de janela deslizante é usado no pipeline de treinamento, um cálculo preciso do recurso em um ponto específico no tempo é realizado na fonte de dados, usando a duração da janela de comprimento fixo imediatamente anterior ao carimbo de data/hora de um evento específico. Isso ajuda a evitar distorções entre os modos online e offline ou vazamento de dados. recurso no horário T eventos agregados de [T − duração, T).

Python
class RollingWindow(TimeWindow):
window_duration: datetime.timedelta
delay: Optional[datetime.timedelta] = None

A tabela a seguir lista os parâmetros para uma janela deslizante. Os horários de início e término da janela são baseados nos seguintes parâmetros:

  • horário de início: evaluation_time - window_duration - delay (inclusive)
  • Hora de término: evaluation_time - delay (exclusivo)

Parâmetro

Restrições

delay (opcional)

Deve ser ≥ 0 (desloca a janela para trás no tempo a partir do carimbo de data/hora da avaliação). Use delay para account qualquer atraso do sistema entre o momento em que o evento é criado e o carimbo de data/hora do evento para evitar vazamento futuro de eventos no conjunto de dados de treinamento. Por exemplo, se houver um atraso de um minuto entre o momento em que os eventos são criados e esses eventos são finalmente inseridos em uma tabela de origem onde recebem um carimbo de data/hora, então o atraso seria timedelta(minutes=1).

window_duration

Deve ser > 0

Python
from databricks.feature_engineering.entities import RollingWindow
from datetime import timedelta

# Look back 7 days from evaluation time
window = RollingWindow(window_duration=timedelta(days=7))

Defina uma janela deslizante com atraso usando o código abaixo.

Python
# Look back 7 days, offset by 1 minute to account for data ingestion delay
window = RollingWindow(
window_duration=timedelta(days=7),
delay=timedelta(minutes=1)
)

Exemplos de janelas deslizantes

  • window_duration=timedelta(days=7)Isso cria uma janela de análise retrospectiva de 7 dias, terminando no momento da avaliação atual. Para um evento às 14h do Dia 7, isso inclui todos os eventos desde as 14h do Dia 0 até (mas não incluindo) as 14h do Dia 7.

  • window_duration=timedelta(hours=1), delay=timedelta(minutes=30)Isso cria uma janela de retrospectiva de 1 hora, terminando 30 minutos antes do horário de avaliação. Para um evento às 15h, isso inclui todos os eventos das 13h30 até (mas não incluindo) 14h30. Isso é útil para account atrasos na ingestão de dados.

Janela basculante

Para recursos definidos usando janelas deslizantes, as agregações são calculadas em uma janela de comprimento fixo predeterminado que avança por um intervalo de deslizamento, produzindo janelas não sobrepostas que particionam completamente o tempo. Consequentemente, cada evento na fonte contribui para exatamente uma janela. recurso no tempo t agrega dados de janelas que terminam em ou antes de t (exclusivo). Windows teve início na época do Unix.

Python
class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta

A tabela a seguir lista os parâmetros para uma janela basculante.

Parâmetro

Restrições

window_duration

Deve ser > 0

Python
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta

window = TumblingWindow(
window_duration=timedelta(days=7)
)

Exemplo de janela basculante

  • window_duration=timedelta(days=5)Isso cria janelas de tempo predefinidas com duração fixa de 5 dias cada. Exemplo: A janela nº 1 abrange do dia 0 ao dia 4, a janela nº 2 abrange do dia 5 ao dia 9, a janela nº 3 abrange do dia 10 ao dia 14 e assim por diante. Especificamente, a Janela #1 inclui todos os eventos com carimbos de data/hora começando em 00:00:00.00 no Dia 0 até (mas não incluindo) quaisquer eventos com carimbo de data/hora 00:00:00.00 no Dia 5. Cada evento pertence a exatamente uma janela.

Janela deslizante

Para recursos definidos usando janelas deslizantes, as agregações são calculadas em uma janela de comprimento fixo predeterminada que avança por um intervalo de deslizamento, produzindo janelas sobrepostas. Cada evento na fonte pode contribuir para a agregação de recursos em múltiplas janelas. recurso no tempo t agrega dados de janelas que terminam em ou antes de t (exclusivo). Windows teve início na época do Unix.

Python
class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta

A tabela a seguir lista os parâmetros para uma janela deslizante.

Parâmetro

Restrições

window_duration

Deve ser > 0

slide_duration

Deve ser > 0 e < window_duration

Python
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta

window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)

Exemplo de janela deslizante

  • window_duration=timedelta(days=5), slide_duration=timedelta(days=1)Isso cria janelas sobrepostas de 5 dias que avançam 1 dia de cada vez. Exemplo: A janela nº 1 abrange do dia 0 ao dia 4, a janela nº 2 abrange do dia 1 ao dia 5, a janela nº 3 abrange do dia 2 ao dia 6 e assim por diante. Cada janela inclui eventos de 00:00:00.00 no dia de início até (mas não incluindo) 00:00:00.00 no dia de término. Como as janelas se sobrepõem, um único evento pode pertencer a várias janelas (neste exemplo, cada evento pertence a até 5 janelas diferentes).

Gatilhos de materialização

Aciona o controle quando um pipeline de materialização é executado. O tipo de gatilho depende do tipo de recurso.

CronSchedule

Use CronSchedule para recurso de agregação (AggregationFunction). A execução pipeline em um programar fixo definido por uma expressão cron do Quartz.

Python
from databricks.feature_engineering.entities import CronSchedule
from databricks.sdk.service.ml import MaterializedFeaturePipelineScheduleState

trigger = CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
)

TableTrigger

Use TableTrigger para recurso ColumnSelection apoiado por um DeltaTableSource. A execução pipeline sempre que a tabela Delta upstream recebe um novo commit.

Python
from databricks.feature_engineering.entities import TableTrigger

trigger = TableTrigger()

Escolhendo um gatilho

Tipo de recurso

Trigger

Quando se trata de execução

Agregação (AggregationFunction)

CronSchedule

Em um programador cron fixo

ColumnSelection (de DeltaTableSource)

TableTrigger

Em cada commit da tabela de origem

Não é possível misturar ColumnSelection e recurso de agregação em uma única chamada materialize_features porque eles exigem tipos de gatilho diferentes. Em vez disso, faça chamadas separadas.