Skip to main content

Forecasting time series with GluonTS

This notebook demonstrates how to use GluonTS for probabilistic time series forecasting on Databricks serverless GPU compute. GluonTS is a Python library focused on deep learning-based approaches for time series modeling.

GluonTS provides a toolkit for forecasting and anomaly detection, with pre-built implementations of state-of-the-art models. It supports both PyTorch and MXNet implementations and includes essential components like neural network architectures, feature processing, and evaluation metrics.

The notebook covers:

  • Loading and preparing electricity consumption data
  • Creating train/test splits for backtesting
  • Training a DeepAR model for forecasting
  • Evaluating predictions with confidence intervals
  • Saving and loading model checkpoints

Connect to serverless GPU compute

Click the Connect dropdown and select Serverless GPU.

Install GluonTS and dependencies

Install the GluonTS library with PyTorch support and wget for downloading the dataset.

Python
# install gluonts package
%pip install -q --upgrade gluonts[torch] wget
Python
dbutils.library.restartPython()

Configure Unity Catalog storage for model checkpoints

Set up Unity Catalog parameters for storing model checkpoints. The checkpoint path uses a Unity Catalog volume to persist model state during training.

Python
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_model_name", "custom_transformer")
dbutils.widgets.text("uc_volume", "checkpoints")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
MODEL_NAME = dbutils.widgets.get("uc_model_name")
CHECKPOINT_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{MODEL_NAME}"

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
print(f"CHECKPOINT_PATH: {CHECKPOINT_PATH}")
Python
# show the installed gluonts version
%pip show gluonts

Verify GPU availability and resources

Check that GPU compute is available and display the hardware specifications.

Python
# show the GPU details
!nvidia-smi
Python
import torch
import psutil

# check that there GPU is available on the notebook compute
assert torch.cuda.is_available(), 'You need to use GPU compute for this notebook'
# show GPU, GPU RAM, number of CPUs and total RAM
print(f"""
Number of GPUs available: {torch.cuda.device_count()}
Total GPU RAM: {torch.cuda.get_device_properties(0).total_memory / (1024 ** 3):.2f} GB
Number of CPUs: {psutil.cpu_count()}
Total RAM: {psutil.virtual_memory().total / (1024 ** 3):.2f} GB
""")

Import required libraries

Import GluonTS components for dataset handling, model training, and evaluation, along with standard data science libraries.

Python
import os
import json
import zipfile
import random
import matplotlib.pyplot as plt
import wget
import tempfile
import numpy as np
import pandas as pd
import matplotlib.colors as mcolors
from itertools import islice, chain

# GluonTS
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split, OffsetSplitter, DateSplitter
from gluonts.dataset.util import to_pandas
from gluonts.dataset.common import ListDataset
from gluonts.dataset.jsonl import JsonLinesFile
from gluonts.evaluation import make_evaluation_predictions, Evaluator
from gluonts.model.predictor import Predictor
from gluonts.model.forecast import QuantileForecast
from gluonts.dataset.field_names import FieldName
from pathlib import Path
from gluonts.torch import (
DeepAREstimator, # RNN
TemporalFusionTransformerEstimator, # LTSM
WaveNetEstimator, # Dilated convolution,
SimpleFeedForwardEstimator, # MLP
DeepNPTSEstimator, # MLP
)
from gluonts.model.seasonal_naive import SeasonalNaivePredictor
from gluonts.ext.prophet import ProphetPredictor
from gluonts.model.npts import NPTSPredictor

import lightning.pytorch as pl
from lightning.pytorch.callbacks import ModelCheckpoint

# setup plt environment
plt.rcParams["axes.grid"] = True
plt.rcParams["figure.figsize"] = (20, 3)
colors = list(mcolors.TABLEAU_COLORS)

Load electricity consumption dataset

This notebook uses the electricity consumption dataset from the University of California, Irvine repository. The dataset contains electricity consumption readings from 370 clients between 2011-2014, with values recorded every 15 minutes in kW.

Configure the dataset source URL and file name for downloading.

Python
data_file_name = 'LD2011_2014.txt'
dataset_url = 'https://archive.ics.uci.edu/static/public/321/electricityloaddiagrams20112014.zip'

The dataset file is approximately 800 MB when extracted, which exceeds the 500 MB workspace file limit for Databricks serverless notebooks. The following code uses a temporary directory to download and extract the data, then loads it into a Pandas DataFrame.

Python
# download and extract data
# the electricity dataset https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014 from the repository of the University of California, Irvine

with tempfile.TemporaryDirectory() as tmp_dir_name:
temp_zip = f'{tmp_dir_name}/ts.zip'
print(f'Downloading data zip file from: {dataset_url}')
wget.download(dataset_url, out=temp_zip)

with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
print(f'Extracting data to: {tmp_dir_name}')
data_file_path = zip_ref.extract(data_file_name, tmp_dir_name)
print(f'Zip extracted to: {data_file_path}')

print('Loading data into Pandas DataFrame')
df_raw = pd.read_csv(
data_file_path,
sep=';',
index_col=0,
decimal=',',
parse_dates=True,
)

Preview the raw electricity consumption data with 15-minute intervals.

Resample data to hourly intervals

Resample the data from 15-minute to 1-hour intervals to reduce the number of data points and speed up training.

Python
# see the data
df_raw
Python
# resample to 1h intervals to reduce the number of data points
freq = "1h"
div = 4 # 1 hour contain 4x 15 min intervals, you need to delete the resampled value by 4
num_timeseries = df_raw.shape[1]
data_kw = df_raw.resample(freq).sum() / div

data_kw

Configure prediction parameters

Set the prediction horizon to 7 days (168 hours) and define the training date range using 2014 data.

Select a subset of time series for faster training. Set USE_FULL_DATASET = True to train on all 370 time series.

Python
# predict for 7 days
prediction_days = 7
# 24 hours per day
intervals_per_day = 24
prediction_length = prediction_days * intervals_per_day

# take the last year of data for a sample
start_training_date = pd.Timestamp('2014-01-01')
end_dataset_date = pd.Timestamp('2014-12-31')

print(f"Sampling frequency set to {freq}. Generate predictions for {prediction_length} intervals")
Python
USE_FULL_DATASET = False # By default use only a subset of the time series because training of full dataset can take longer time
SAMPLE_SIZE = 10 # set number of samples in the dataset if you don't use the full dataset
MAX_TS_TO_DISPLAY = 10

# get the full dataset or a random sample of SAMPLE_SIZE
# you can change the selection to include specific time series
# ts_sample = data_kw[['item_id1', 'item_id2']]
ts_sample = data_kw if USE_FULL_DATASET else data_kw[np.random.choice(data_kw.columns.to_list(), size=SAMPLE_SIZE, replace=False)]

Convert data to GluonTS format

Convert the Pandas DataFrame to GluonTS format and visualize the time series. See the GluonTS Quick start for more examples.

Python
# convert to GluonTS format, taking only the data between start_training_date and end_dataset_date
ts_dataset = PandasDataset(
dict(ts_sample[(ts_sample.index > start_training_date) & (ts_sample.index <= end_dataset_date)].astype(np.float32))
)

# visualize time series in the GluonTS dataset
for i, entry in enumerate(islice(ts_dataset, MAX_TS_TO_DISPLAY)):
to_pandas(entry).plot(label=entry[FieldName.ITEM_ID], color=colors[i % len(colors)])
plt.legend()
plt.tight_layout()
plt.show()

print(f'The GluonTS dataset contains {len(ts_dataset)} individual time series from {start_training_date} to {end_dataset_date}')

Create train/test split for backtesting

Split the dataset into training and test sets using rolling windows. This creates 4 test windows for backtesting model performance.

Python
# set backtest parameters
NUM_WINDOWS = 4 # number of rolling windows for backtest
# distance between windows, set to:
# < prediction_length for overlapping windows
# = prediction length for adjucent windows
# > prediction_length for non overapping and non-adjucent windows
DISTANCE = prediction_length

# set the training-testing split date
end_training_date = pd.Period(end_dataset_date, freq=freq) - NUM_WINDOWS*prediction_length

# split into train and test datasets using GluonTS's DateSplitter
train_ds, test_template = DateSplitter(date=end_training_date).split(ts_dataset)
test_pairs = test_template.generate_instances(
prediction_length=prediction_length,
windows=NUM_WINDOWS,
distance=DISTANCE,
)

print(f"The dataset is splitted in {len(train_ds)} training datasets and {len(test_pairs)} test pairs. Training end is {end_training_date}")

Train a DeepAR model

Train a DeepAR estimator, a recurrent neural network model for probabilistic forecasting. See Available Models in the GluonTS documentation for other algorithms.

Configure the DeepAR model hyperparameters and training settings. The model uses a context length of 4 times the prediction length and saves checkpoints after each epoch.

Python
NUM_EPOCHS = 10

os.makedirs(CHECKPOINT_PATH, exist_ok=True)

checkpoint_cb = ModelCheckpoint(
dirpath=CHECKPOINT_PATH,
filename="deepar-{epoch:02d}-{step}",
save_top_k=-1, # keep all checkpoints
every_n_epochs=1, # save after every epoch
save_on_train_epoch_end=True,
)

# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq":freq,
"prediction_length":prediction_length,
"context_length":4*prediction_length,
}

# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator":"auto",
"max_epochs":NUM_EPOCHS,
"callbacks":[checkpoint_cb]
}

# create a DeepAR estimator
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)

torch.set_float32_matmul_precision('medium') # 'high'

Train the DeepAR model on the training dataset. Training for 10 epochs takes approximately 60 seconds on a single GPU.

Python
# train the network
# the training for 10 epochs takes about 60 second on a single GPU in this notebook
deepar_predictor = deepar_estimator.train(train_ds)

Generate and visualize predictions

Use the trained model to predict the next 7 days for each time series. The visualizations show predicted values with 90% confidence intervals and ground truth values.

Python
# predict
forecasts = deepar_predictor.predict(test_pairs.input, num_samples=20)

# ground truth
labels = [to_pandas(l) for l in test_pairs.label]

# visualize predictions
for i, forecast in enumerate(islice(forecasts, MAX_TS_TO_DISPLAY)):
plt.plot(labels[i][-NUM_WINDOWS*prediction_length:].to_timestamp())
forecast.plot(intervals=(0.9,), show_label=True)
plt.legend([f"Ground truth: {forecast.item_id}", "predicted median", "90% confidence interval"])
plt.show()

Evaluate model performance

Calculate evaluation metrics using the GluonTS Evaluator. Metrics include MASE, RMSE, and quantile losses.

Python
# calculate evaluation metrics
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
agg_metrics, item_metrics = evaluator(
labels,
deepar_predictor.predict(test_pairs.input, num_samples=20),
num_series=len(test_pairs),
)

# metrics per time series
item_metrics.display()

# aggregated metrics
print(json.dumps(agg_metrics, indent=2))

Resume training from checkpoint

Load a saved checkpoint and continue training for additional epochs. This demonstrates how to resume training from a previously saved model state.

Configure the model to train for an additional 10 epochs starting from the saved checkpoint at epoch 9.

Python
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq": freq,
"prediction_length": prediction_length,
"context_length": 4 * prediction_length,
}

# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator": "auto",
"max_epochs": NUM_EPOCHS + 10, # Train for another 10 epochs
"callbacks": [checkpoint_cb],
}

# create a DeepAR estimator using the model checkpoint
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)

updated_predictor = deepar_estimator.train(
training_data=train_ds,
ckpt_path=f"{CHECKPOINT_PATH}/deepar-epoch=09-step=500.ckpt",
)

Next steps

This notebook demonstrated the basics of time series forecasting with GluonTS on Databricks serverless GPU compute. To learn more:

Example notebook

Forecasting time series with GluonTS

Open notebook in new tab