databricks-logo

    feature-function-serving-online-tables-dbsdk

    (Python)
    Loading...

    Feature Serving example notebook

    Feature Serving lets you serve pre-materialized features and run on-demand computation for features.

    This notebook illustrates how to:

    1. Create a FeatureSpec. A FeatureSpec defines a set of features (prematerialized and on-demand) that are served together.
    2. Create an Online Table from a Delta Table.
    3. Serve the features. To serve features, you create a Feature Serving endpoint with the FeatureSpec.

    Requirements

    This notebook requires Databricks Runtime for Machine Learning 14.2 or above.

    Set up the Feature Table

    3
    %pip install databricks-sdk --upgrade
    %pip install mlflow>=2.9.0
    dbutils.library.restartPython()
    4
    # Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
    # Change the catalog and schema here if necessary.
    
    catalog_name = "main"
    schema_name = "default"
    from databricks import feature_engineering
    
    fe = feature_engineering.FeatureEngineeringClient()
    feature_table_name = f"{catalog_name}.{schema_name}.location_features"
    online_table_name=f"{catalog_name}.{schema_name}.location_features_online"
    function_name = f"{catalog_name}.{schema_name}.distance"

    To access the feature table from Feature Serving, you must create an Online Table from the feature table.

    Feature table is used for offline training of models, and online table is used in online inference

    # Read in the dataset
    destination_location_df = spark.read.option("inferSchema", "true").load("/databricks-datasets/travel_recommendations_realtime/raw_travel_data/fs-demo_destination-locations/", format="csv", header="true")
    
    # Create the feature table
    fe.create_table(
      name = feature_table_name,
      primary_keys="destination_id",
      df = destination_location_df,
      description = "Destination location features."
    )

    Set up a Databricks Online Table

    You can create an online table from the Catalog Explorer UI, Databricks SDK or Rest API. The steps to use Databricks python SDK are described below. For more details, see the Databricks documentation (AWS|Azure). For information about required permissions, see Permissions (AWS|Azure).

    from pprint import pprint
    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.catalog import *
    import mlflow
    
    workspace = WorkspaceClient()
    
    # Create an online table
    spec = OnlineTableSpec(
      primary_key_columns = ["destination_id"],
      source_table_full_name = feature_table_name,
      run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
      perform_full_copy=True)
    
    try:
      online_table_pipeline = workspace.online_tables.create(OnlineTable(name=online_table_name, spec=spec))
    except Exception as e:
      if "already exists" in str(e):
        pass
      else:
        raise e
    
    pprint(workspace.online_tables.get(online_table_name))

    Create the function

    The next cell defines a function that calculates the distance between the destination and the user's current location.

    # Define the function. This function calculates the distance between two locations. 
    spark.sql(f"""
    CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
    RETURNS DOUBLE
    LANGUAGE PYTHON AS
    $$
    import math
    lat1 = math.radians(latitude)
    lon1 = math.radians(longitude)
    lat2 = math.radians(user_latitude)
    lon2 = math.radians(user_longitude)
    
    # Earth's radius in kilometers
    radius = 6371
    
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    distance = radius * c
    
    return distance
    $$""")
    from databricks.feature_engineering import FeatureLookup, FeatureFunction
    
    features=[
      FeatureLookup(
        table_name=feature_table_name,
        lookup_key="destination_id"
      ),
      FeatureFunction(
        udf_name=function_name, 
        output_name="distance",
        input_bindings={
          "latitude": "latitude", 
          "longitude": "longitude", 
          "user_latitude": "user_latitude", 
          "user_longitude": "user_longitude"
        },
      ),
    ]
    
    feature_spec_name = f"{catalog_name}.{schema_name}.travel_spec"
    try: 
      fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
    except Exception as e:
      if "already exists" in str(e):
        pass
      else:
        raise e

    You can now view the FeatureSpec (travel_spec) and the distance function (distance) in Catalog Explorer. Click Catalog in the sidebar. In the Catalog Explorer, navigate to your schema in the main catalog. The FeatureSpec and the function appear under Functions.

    Create a Feature Serving endpoint

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
    
    # Create endpoint
    endpoint_name = "fse-location"
    
    try:
      status = workspace.serving_endpoints.create_and_wait(
      name=endpoint_name,
      config = EndpointCoreConfigInput(
        served_entities=[
        ServedEntityInput(
            entity_name=feature_spec_name,
            scale_to_zero_enabled=True,
            workload_size="Small"
        )
        ]
      )
      )
      print(status)
    except Exception as e:
      if "already exists" in str(e):
        pass
      else:
        raise e
    # Get the status of the endpoint
    status = workspace.serving_endpoints.get(name=endpoint_name)
    print(status)

    You can now view the status of the Feature Serving Endpoint in the table on the Serving endpoints page. Click Serving in the sidebar to display the page.

    Query

    import mlflow.deployments
    
    client = mlflow.deployments.get_deploy_client("databricks")
    response = client.predict(
        endpoint=endpoint_name,
        inputs={
            "dataframe_records": [
                {"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
                {"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
            ]
        },
    )
    
    pprint(response)

    Clean up

    When you are finished, delete the FeatureSpec, feature endpoint, and online table.

    # fe.delete_feature_spec(name=feature_spec_name)
    # workspace.serving_endpoints.delete(name=endpoint_name)
    # workspace.online_tables.delete(name=online_table_name)
    ;