%md # Identify long-running clusters and optionally restart them This notebook will query the Databricks API for all live clusters. For each of them, it will determine the oldest living executor or driver instance, and if that age is older than the maximum configured time, it will use the Databricks API to restart the instance. This will cause downtime. Notebooks will be reset and any active jobs will fail when the cluster is restarted. This requires admin rights. ## Configure your credentials In order to use this notebook, you will need to have an admin-level personal access token. To practice secure coding practices, this should not be hardcoded in a notebook, and rather should be stored using the Databricks Secrets capability or using a third party secret manager. Configure the Databricks Secrets service using the databricks cli or use the API. (Docs for [CLI](https://docs.databricks.com/security/secrets/secrets.html) and [API](https://docs.databricks.com/dev-tools/api/latest/secrets.html)) Example for API, after configuring the CLI: $ databricks secrets create-scope --scope YOUR_SCOPE_NAME $ databricks secrets put --scope splunk_env --key YOUR_KEY_NAME The scope name and key name can be whatever you wish, just provide them below in the configuration section. After each put, the CLI will open up the vi text editor. Press the letter 'i' to switch to "insert" mode and then type in the username (or password). To get out of "insert" mode, hit escape, and then save and quit by typing ':wq'. Databricks will automatically remove any leading or trailing whitespace. Confused by vi? You're not alone. [vi for beginners](https://www.howtogeek.com/102468/a-beginners-guide-to-editing-text-files-with-vi/)
Identify long-running clusters and optionally restart them
This notebook will query the Databricks API for all live clusters. For each of them, it will determine the oldest living executor or driver instance, and if that age is older than the maximum configured time, it will use the Databricks API to restart the instance.
This will cause downtime. Notebooks will be reset and any active jobs will fail when the cluster is restarted.
This requires admin rights.
Configure your credentials
In order to use this notebook, you will need to have an admin-level personal access token. To practice secure coding practices, this should not be hardcoded in a notebook, and rather should be stored using the Databricks Secrets capability or using a third party secret manager. Configure the Databricks Secrets service using the databricks cli or use the API. (Docs for CLI and API)
Example for API, after configuring the CLI:
$ databricks secrets create-scope --scope YOUR_SCOPE_NAME
$ databricks secrets put --scope splunk_env --key YOUR_KEY_NAME
The scope name and key name can be whatever you wish, just provide them below in the configuration section.
After each put, the CLI will open up the vi text editor. Press the letter 'i' to switch to "insert" mode and then type in the username (or password). To get out of "insert" mode, hit escape, and then save and quit by typing ':wq'. Databricks will automatically remove any leading or trailing whitespace. Confused by vi? You're not alone. vi for beginners
max_age = 24 # In Days perform_restart = False min_age_output = 1 # In Days secret_configuration = { "scope": "REPLACE_WITH_SCOPE", "key": "REPLACE_WITH_KEY" } # See note above api_url = "YOUR_DEPLOYMENT_HERE" # API URL Examples: # # AWS deployment URL format is similar to: https://mydeployment.cloud.databricks.com # Azure deployment URL format is similar to: https://adb-9999999999999999.58.azuredatabricks.net or https://westus.azuredatabricks.net # GCP deployment URL format is similar to: https://9999999999999999.9.gcp.databricks.net import re if not re.match(r"^https://[^/]+$", api_url): raise Exception("Invalid URL format. See examples")
import json, requests, time token = dbutils.secrets.get(scope=secret_configuration['scope'], key=secret_configuration['key']) def renderURL(api_url, version, endpoint): return api_url + '/api/{0}'.format(version) + endpoint def renderToken(token): return {'Authorization': 'Bearer {0}'.format(token)} def get(endpoint, api_url, token, json_params = None, printJson = False, version = '2.0', debug=False): if json_params: resp = requests.get(renderURL(api_url, version, endpoint), headers=renderToken(token), params=json_params) if resp.status_code == 403: if debug: print("Invalid Token", renderToken(token)) raise PermissionError("Authentication Failed") if debug: print("get DEBUG", resp.text) results = resp.json() else: resp = requests.get(renderURL(api_url, version, endpoint), headers=renderToken(token)) if resp.status_code == 403: if debug: print("Invalid Token", renderToken(token)) raise PermissionError("Authentication Failed") if debug: print("get DEBUG", resp.text) results = resp.json() if printJson: print(json.dumps(results, indent=4, sort_keys=True)) return results def post(endpoint, api_url, token, json_params = None, printJson = False, version = '2.0'): if json_params: try: raw_results = requests.post(renderURL(api_url, version, endpoint), headers=renderToken(token), json=json_params) results = raw_results.json() except: raw_results = requests.post(renderURL(api_url, version, endpoint), headers=renderToken(token), data=json.dumps(json_params)) results = raw_results.json() else: print("Must have a payload in json_args param.") return {} if printJson: print('""' + json.dumps(results, sort_keys=True) + '""') # if results are empty, let's return the return status if results: results['http_status_code'] = raw_results.status_code return results else: return {'http_status_code': raw_results.status_code} clusters = get("/clusters/list", api_url, token) queued_restart = None list_of_clusters = [] for cluster in clusters['clusters']: if "driver" in cluster: startTimes = [] startTimes.append(cluster['driver']['start_timestamp']) if "executors" in cluster: for executor in cluster['executors']: startTimes.append(executor['start_timestamp']) oldest_age = round((int(time.time()) - min(startTimes)/1000. ) / 3600/24, 4) if oldest_age > min_age_output: status=f"Cluster ID {cluster['cluster_id']}, name {cluster['cluster_name']}, {len(startTimes)} nodes, started {time.strftime('%m/%d/%Y %H:%M:%S', time.gmtime(min(startTimes)/1000.))} ({oldest_age} days old)" if perform_restart: print(status) else: list_of_clusters.append({"age": oldest_age, "status": status}) if perform_restart: if (int(time.time()) - min(startTimes)/1000. ) / 3600/24 > max_age: if cluster['cluster_id'] == spark.conf.get("spark.databricks.clusterUsageTags.clusterId"): print("\tQueuing cluster restart as it is your currently running cluster") queued_restart = cluster['cluster_id'] else: print("\tPerforming Restart...") post("/clusters/restart", api_url, token, json_params={"cluster_id": cluster['cluster_id']}) if not perform_restart: list_of_clusters_by_age = sorted(list_of_clusters, key=lambda x: x['age'], reverse=True) print("\n".join([c['status'] for c in list_of_clusters_by_age])) if perform_restart and queued_restart: print("Restarting currently active cluster.") print("Starting in ten seconds...") time.sleep(5) print("... 5 seconds ...") time.sleep(5) print("Restarting") post("/clusters/restart", api_url, token, json_params={"cluster_id": queued_restart})