token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None) # With the token, you can create our authorization header for our subsequent REST calls headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # Next you need an endpoint at which to execute your request which you can get from the notebook's tags collection java_tags = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags() # This object comes from the Java CM - Convert the Java Map opject to a Python dictionary tags = sc._jvm.scala.collection.JavaConversions.mapAsJavaMap(java_tags) # Lastly, extract the Databricks instance (domain name) from the dictionary instance = tags["browserHostName"] from mlflow.tracking.client import MlflowClient def get_latest_model_version(model_name: str): client = MlflowClient() models = client.get_latest_versions(model_name, stages=["None"]) for m in models: new_model_version = m.version return new_model_version
def func_create_endpoint(model_serving_endpoint_name): #get endpoint status endpoint_url = f"https://{instance}/api/2.0/serving-endpoints" url = f"{endpoint_url}/{model_serving_endpoint_name}" r = requests.get(url, headers=headers) if "RESOURCE_DOES_NOT_EXIST" in r.text: print("Creating this new endpoint: ", f"https://{instance}/serving-endpoints/{model_serving_endpoint_name}/invocations") re = requests.post(endpoint_url, headers=headers, json=my_json) else: new_model_version = (my_json['config'])['served_models'][0]['model_version'] print("This endpoint existed previously! We are updating it to a new config with new model version: ", new_model_version) # update config url = f"{endpoint_url}/{model_serving_endpoint_name}/config" re = requests.put(url, headers=headers, json=my_json['config']) # wait till new config file in place import time,json #get endpoint status url = f"https://{instance}/api/2.0/serving-endpoints/{model_serving_endpoint_name}" retry = True total_wait = 0 while retry: r = requests.get(url, headers=headers) assert r.status_code == 200, f"Expected an HTTP 200 response when accessing endpoint info, received {r.status_code}" endpoint = json.loads(r.text) if "pending_config" in endpoint.keys(): seconds = 10 print("New config still pending") if total_wait < 6000: #if less the 10 mins waiting, keep waiting print(f"Wait for {seconds} seconds") print(f"Total waiting time so far: {total_wait} seconds") time.sleep(10) total_wait += seconds else: print(f"Stopping, waited for {total_wait} seconds") retry = False else: print("New config in place now!") retry = False assert re.status_code == 200, f"Expected an HTTP 200 response, received {re.status_code}" def func_delete_model_serving_endpoint(model_serving_endpoint_name): endpoint_url = f"https://{instance}/api/2.0/serving-endpoints" url = f"{endpoint_url}/{model_serving_endpoint_name}" response = requests.delete(url, headers=headers) if response.status_code != 200: raise Exception(f"Request failed with status {response.status_code}, {response.text}") else: print(model_serving_endpoint_name, "endpoint is deleted!") #return response.json()
Command skipped
#GET /api/2.0/serving-endpoints/{name} import time, mlflow def wait_for_endpoint(): endpoint_url = f"https://{instance}/api/2.0/serving-endpoints" while True: url = f"{endpoint_url}/{model_serving_endpoint_name}" response = requests.get(url, headers=headers) assert response.status_code == 200, f"Expected an HTTP 200 response, received {response.status_code}\n{response.text}" status = response.json().get("state", {}).get("ready", {}) #print("status",status) if status == "READY": print(status); print("-"*80); return else: print(f"Endpoint not ready ({status}), waiting 10 seconds"); time.sleep(10) # Wait 10 seconds api_url = mlflow.utils.databricks_utils.get_webapp_url() #print(api_url) wait_for_endpoint() # Give the system just a couple extra seconds to transition time.sleep(5)
Command skipped
import requests def score_model(data_json: dict): url = f"https://{instance}/serving-endpoints/{model_serving_endpoint_name}/invocations" response = requests.request(method="POST", headers=headers, url=url, json=data_json) if response.status_code != 200: raise Exception(f"Request failed with status {response.status_code}, {response.text}") return response.json() payload_json = { "dataframe_records": [ # Users in New York, see high scores for Florida {"user_id": 4, "booking_date": "2022-12-22", "destination_id": 16, "user_latitude": 40.71277, "user_longitude": -74.005974}, # Users in California, see high scores for Hawaii {"user_id": 39, "booking_date": "2022-12-22", "destination_id": 1, "user_latitude": 37.77493, "user_longitude": -122.41942} ] }
Command skipped
Create a model serving endpoint with Python
This notebook covers wrapping the REST API queries for model serving endpoint creation, updating endpoint configuration based on model version, and endpoint deletion with Python for your Python model serving workflows.
Learn more about model serving on Databricks (AWS | Azure).
Requirements
Databricks Runtime ML 12.0 or above