Decorators
-
databricks.feature_store.decorators.
feature_table
(func) Note
Experimental: This decorator may change or be removed in a future release without warning.
The
@feature_table
decorator specifies that a function is used to generate feature data. Functions decorated with@feature_table
must return a singleDataFrame
, which will be written to Feature Store. For example:from databricks.feature_store import feature_table @feature_table def compute_customer_features(data): '''Feature computation function that takes raw data and returns a DataFrame of features.''' return (data.groupBy('cid') .agg(count('*').alias('num_purchases')) )
A function that is decorated with the
@feature_table
decorator will gain these function attributes:-
databricks.feature_store.decorators.
compute_and_write
(input: Dict[str, Any], feature_table_name: str, mode: str = 'merge') → pyspark.sql.dataframe.DataFrame Note
Experimental: This function may change or be removed in a future release without warning.
Calls the decorated function using the provided
input
, then writes the outputDataFrame
to the feature table specified byfeature_table_name
.compute_customer_features.compute_and_write( input={ 'data': data, }, feature_table_name='recommender_system.customer_features', mode='merge' )
- Parameters
input – If
input
is not a dictionary, it is passed to the decorated function as the first positional argument. Ifinput
is a dictionary, the contents are unpacked and passed to the decorated function as keyword arguments.feature_table_name – A feature table name of the form
<database_name>.<table_name>
, for exampledev.user_features
. Raises exception if this feature table does not exist.mode – Two supported write modes:
"overwrite"
updates the whole table, while"merge"
will upsert the rows indf
into the feature table.
- Returns
DataFrame
(df
) containing feature values.
-
databricks.feature_store.decorators.
compute_and_write_streaming
(input: Dict[str, Any], feature_table_name: str, checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 minutes'}) → pyspark.sql.streaming.StreamingQuery Note
Experimental: This function may change or be removed in a future release without warning.
Calls the decorated function using the provided input, then streams the output
DataFrame
to the feature table specified byfeature_table_name
.compute_customer_features.compute_and_write_streaming( input={ 'data': data, }, feature_table_name='recommender_system.customer_features', )
- Parameters
input – If
input
is not a dictionary, it is passed to the decorated function as the first positional argument. Ifinput
is a dictionary, the contents are unpacked and passed to the decorated function as keyword arguments.feature_table_name – A feature table name of the form
<database_name>.<table_name>
, for exampledev.user_features
.checkpoint_location – Sets the Structured Streaming
checkpointLocation
option. By setting acheckpoint_location
, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when the argumentdf
is a streamingDataFrame
.trigger –
trigger
defines the timing of stream data processing, the dictionary will be unpacked and passed toDataStreamWriter.trigger
as arguments. For example,trigger={'once': True}
will result in a call toDataStreamWriter.trigger(once=True)
.
- Returns
A PySpark
StreamingQuery
.
-