decorators

databricks.feature_store.decorators.feature_table(func)

Note

Experimental: This decorator may change or be removed in a future release without warning.

The @feature_table decorator specifies that a function is used to generate feature data. Functions decorated with @feature_table must return a single DataFrame, which will be written to Feature Store. For example:

from databricks.feature_store import feature_table

@feature_table
def compute_customer_features(data):
    '''Feature computation function that takes raw
       data and returns a DataFrame of features.'''
    return (data.groupBy('cid')
      .agg(count('*').alias('num_purchases'))
    )

A function that is decorated with the @feature_table decorator will gain these function attributes:

databricks.feature_store.decorators.compute_and_write(input: Dict[str, Any], feature_table_name: str, mode: str = 'merge') → pyspark.sql.dataframe.DataFrame

Note

Experimental: This function may change or be removed in a future release without warning.

Calls the decorated function using the provided input, then writes the output DataFrame to the feature table specified by feature_table_name.

Example:
compute_customer_features.compute_and_write(
    input={
        'data': data,
    },
    feature_table_name='recommender_system.customer_features',
    mode='merge'
)
Parameters:
  • input – If input is not a dictionary, it is passed to the decorated function as the first positional argument. If input is a dictionary, the contents are unpacked and passed to the decorated function as keyword arguments.
  • feature_table_name – A feature table name of the form <database_name>.<table_name>, for example dev.user_features. Raises exception if this feature table does not exist.
  • mode – Two supported write modes: "overwrite" updates the whole table, while "merge" will upsert the rows in df into the feature table.
Returns:

DataFrame (df) containing feature values.

databricks.feature_store.decorators.compute_and_write_streaming(input: Dict[str, Any], feature_table_name: str, checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 minutes'}) → pyspark.sql.streaming.StreamingQuery

Note

Experimental: This function may change or be removed in a future release without warning.

Calls the decorated function using the provided input, then streams the output DataFrame to the feature table specified by feature_table_name.

Example:
compute_customer_features.compute_and_write_streaming(
    input={
        'data': data,
    },
    feature_table_name='recommender_system.customer_features',
)
Parameters:
  • input – If input is not a dictionary, it is passed to the decorated function as the first positional argument. If input is a dictionary, the contents are unpacked and passed to the decorated function as keyword arguments.
  • feature_table_name – A feature table name of the form <database_name>.<table_name>, for example dev.user_features.
  • checkpoint_location – Sets the Structured Streaming checkpointLocation option. By setting a checkpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when the argument df is a streaming DataFrame.
  • triggertrigger defines the timing of stream data processing, the dictionary will be unpacked and passed to DataStreamWriter.trigger as arguments. For example, trigger={'once': True} will result in a call to DataStreamWriter.trigger(once=True).
Returns:

A PySpark StreamingQuery.