Pular para o conteúdo principal

Previsão de séries temporais com GluonTS

Este notebook demonstra como usar o GluonTS para previsão probabilística de séries temporais em compute GPU serverless Databricks . GluonTS é uma biblioteca Python focada em abordagens baseadas em aprendizagem profunda para modelagem de séries temporais.

O GluonTS fornece um conjunto de ferramentas para previsão e detecção de anomalias, com implementações pré-construídas de modelos de última geração. Ele suporta implementações PyTorch e MXNet e inclui componentes essenciais como arquiteturas de redes neurais, processamento de recursos e métricas de avaliação.

O livro "The Notebook" aborda os seguintes temas:

  • Carregando e preparando dados de consumo de eletricidade
  • Criando divisões de ensino/teste para backtesting
  • treinamento um modelo DeepAR para previsão
  • Avaliando previsões com intervalos de confiança.
  • Salvando e carregando pontos de verificação do modelo

Conecte-se à computeGPU serverless

Clique no dropdown Conectar e selecione GPU sem servidor .

Instale o GluonTS e suas dependências.

Instale a biblioteca GluonTS com suporte PyTorch e use o wget para baixar o dataset.

Python
# install gluonts package
%pip install -q --upgrade gluonts[torch] wget
Python
dbutils.library.restartPython()

Configure o armazenamento Unity Catalog para pontos de verificação do modelo.

Configure os parâmetros do Unity Catalog para armazenar os pontos de verificação do modelo. O caminho de checkpoint utiliza um volume do Unity Catalog para persistir o estado do modelo durante o treinamento.

Python
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_model_name", "custom_transformer")
dbutils.widgets.text("uc_volume", "checkpoints")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
MODEL_NAME = dbutils.widgets.get("uc_model_name")
CHECKPOINT_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{MODEL_NAME}"

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
print(f"CHECKPOINT_PATH: {CHECKPOINT_PATH}")
Python
# show the installed gluonts version
%pip show gluonts

Verificar a disponibilidade e os recursos da GPU

Verifique se o compute da GPU está disponível e exiba as especificações de hardware.

Python
# show the GPU details
!nvidia-smi
Python
import torch
import psutil

# check that there GPU is available on the notebook compute
assert torch.cuda.is_available(), 'You need to use GPU compute for this notebook'
# show GPU, GPU RAM, number of CPUs and total RAM
print(f"""
Number of GPUs available: {torch.cuda.device_count()}
Total GPU RAM: {torch.cuda.get_device_properties(0).total_memory / (1024 ** 3):.2f} GB
Number of CPUs: {psutil.cpu_count()}
Total RAM: {psutil.virtual_memory().total / (1024 ** 3):.2f} GB
""")

Importar biblioteca necessária

Importe componentes GluonTS para manipulação de dataset , treinamento de modelo e avaliação, juntamente com biblioteca padrão de ciência de dados.

Python
import os
import json
import zipfile
import random
import matplotlib.pyplot as plt
import wget
import tempfile
import numpy as np
import pandas as pd
import matplotlib.colors as mcolors
from itertools import islice, chain

# GluonTS
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split, OffsetSplitter, DateSplitter
from gluonts.dataset.util import to_pandas
from gluonts.dataset.common import ListDataset
from gluonts.dataset.jsonl import JsonLinesFile
from gluonts.evaluation import make_evaluation_predictions, Evaluator
from gluonts.model.predictor import Predictor
from gluonts.model.forecast import QuantileForecast
from gluonts.dataset.field_names import FieldName
from pathlib import Path
from gluonts.torch import (
DeepAREstimator, # RNN
TemporalFusionTransformerEstimator, # LTSM
WaveNetEstimator, # Dilated convolution,
SimpleFeedForwardEstimator, # MLP
DeepNPTSEstimator, # MLP
)
from gluonts.model.seasonal_naive import SeasonalNaivePredictor
from gluonts.ext.prophet import ProphetPredictor
from gluonts.model.npts import NPTSPredictor

import lightning.pytorch as pl
from lightning.pytorch.callbacks import ModelCheckpoint

# setup plt environment
plt.rcParams["axes.grid"] = True
plt.rcParams["figure.figsize"] = (20, 3)
colors = list(mcolors.TABLEAU_COLORS)

datasetde consumo de eletricidade de carga

Este notebook utiliza o dataset de consumo de eletricidade do acervo da Universidade da Califórnia, Irvine. O dataset contém leituras de consumo de eletricidade de 370 clientes entre 2011 e 2014, com valores registrados a cada 15 minutos em kW.

Configure o URL da fonte dataset e o nome do arquivo para downloads.

Python
data_file_name = 'LD2011_2014.txt'
dataset_url = 'https://archive.ics.uci.edu/static/public/321/electricityloaddiagrams20112014.zip'

O arquivo dataset tem aproximadamente 800 MB quando extraído, o que excede o limite de 500 MB para arquivos de workspace no Databricks serverless Notebook. O código a seguir usa um diretório temporário para download e extrair os dados e, em seguida, os carrega em um Pandas DataFrame.

Python
# download and extract data
# the electricity dataset https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014 from the repository of the University of California, Irvine

with tempfile.TemporaryDirectory() as tmp_dir_name:
temp_zip = f'{tmp_dir_name}/ts.zip'
print(f'Downloading data zip file from: {dataset_url}')
wget.download(dataset_url, out=temp_zip)

with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
print(f'Extracting data to: {tmp_dir_name}')
data_file_path = zip_ref.extract(data_file_name, tmp_dir_name)
print(f'Zip extracted to: {data_file_path}')

print('Loading data into Pandas DataFrame')
df_raw = pd.read_csv(
data_file_path,
sep=';',
index_col=0,
decimal=',',
parse_dates=True,
)

Visualize os dados brutos de consumo de eletricidade em intervalos de 15 minutos.

Reamostrar dados para intervalos de uma hora

Reamostre os dados de intervalos de 15 minutos para 1 hora para reduzir o número de pontos de dados e acelerar o treinamento.

Python
# see the data
df_raw
Python
# resample to 1h intervals to reduce the number of data points
freq = "1h"
div = 4 # 1 hour contain 4x 15 min intervals, you need to delete the resampled value by 4
num_timeseries = df_raw.shape[1]
data_kw = df_raw.resample(freq).sum() / div

data_kw

Configurar parâmetros de previsão

Defina o horizonte de previsão para 7 dias (168 horas) e defina o intervalo de datas de treinamento usando dados de 2014.

Selecione um subconjunto de séries temporais para um treinamento mais rápido. Defina USE_FULL_DATASET = True para ensinar em todas as 370 séries temporais.

Python
# predict for 7 days
prediction_days = 7
# 24 hours per day
intervals_per_day = 24
prediction_length = prediction_days * intervals_per_day

# take the last year of data for a sample
start_training_date = pd.Timestamp('2014-01-01')
end_dataset_date = pd.Timestamp('2014-12-31')

print(f"Sampling frequency set to {freq}. Generate predictions for {prediction_length} intervals")
Python
USE_FULL_DATASET = False # By default use only a subset of the time series because training of full dataset can take longer time
SAMPLE_SIZE = 10 # set number of samples in the dataset if you don't use the full dataset
MAX_TS_TO_DISPLAY = 10

# get the full dataset or a random sample of SAMPLE_SIZE
# you can change the selection to include specific time series
# ts_sample = data_kw[['item_id1', 'item_id2']]
ts_sample = data_kw if USE_FULL_DATASET else data_kw[np.random.choice(data_kw.columns.to_list(), size=SAMPLE_SIZE, replace=False)]

Converter dados para o formato GluonTS

Converta o Pandas DataFrame para o formato GluonTS e visualize a série temporal. Consulte o Guia Rápido do GluonTS para obter mais exemplos.

Python
# convert to GluonTS format, taking only the data between start_training_date and end_dataset_date
ts_dataset = PandasDataset(
dict(ts_sample[(ts_sample.index > start_training_date) & (ts_sample.index <= end_dataset_date)].astype(np.float32))
)

# visualize time series in the GluonTS dataset
for i, entry in enumerate(islice(ts_dataset, MAX_TS_TO_DISPLAY)):
to_pandas(entry).plot(label=entry[FieldName.ITEM_ID], color=colors[i % len(colors)])
plt.legend()
plt.tight_layout()
plt.show()

print(f'The GluonTS dataset contains {len(ts_dataset)} individual time series from {start_training_date} to {end_dataset_date}')

Criar divisão entre ensino e teste para backtesting

Divida o dataset em conjuntos de treinamento e teste usando janelas deslizantes. Isso cria 4 janelas de teste para avaliar o desempenho do modelo em retrospectiva.

Python
# set backtest parameters
NUM_WINDOWS = 4 # number of rolling windows for backtest
# distance between windows, set to:
# < prediction_length for overlapping windows
# = prediction length for adjucent windows
# > prediction_length for non overapping and non-adjucent windows
DISTANCE = prediction_length

# set the training-testing split date
end_training_date = pd.Period(end_dataset_date, freq=freq) - NUM_WINDOWS*prediction_length

# split into train and test datasets using GluonTS's DateSplitter
train_ds, test_template = DateSplitter(date=end_training_date).split(ts_dataset)
test_pairs = test_template.generate_instances(
prediction_length=prediction_length,
windows=NUM_WINDOWS,
distance=DISTANCE,
)

print(f"The dataset is splitted in {len(train_ds)} training datasets and {len(test_pairs)} test pairs. Training end is {end_training_date}")

ensinar um modelo DeepAR

Este artigo ensina o estimador DeepAR, um modelo de rede neural recorrente para previsão probabilística. Consulte a seção "Modelos Disponíveis" na documentação do GluonTS para obter informações sobre outros algoritmos.

Configure os hiperparâmetros e as configurações de treinamento do modelo DeepAR. O modelo utiliza um comprimento de contexto 4 vezes maior que o comprimento da previsão e salva pontos de verificação após cada época.

Python
NUM_EPOCHS = 10

os.makedirs(CHECKPOINT_PATH, exist_ok=True)

checkpoint_cb = ModelCheckpoint(
dirpath=CHECKPOINT_PATH,
filename="deepar-{epoch:02d}-{step}",
save_top_k=-1, # keep all checkpoints
every_n_epochs=1, # save after every epoch
save_on_train_epoch_end=True,
)

# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq":freq,
"prediction_length":prediction_length,
"context_length":4*prediction_length,
}

# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator":"auto",
"max_epochs":NUM_EPOCHS,
"callbacks":[checkpoint_cb]
}

# create a DeepAR estimator
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)

torch.set_float32_matmul_precision('medium') # 'high'

ensinando o modelo DeepAR no dataset de treinamento. O treinamento por 10 épocas leva aproximadamente 60 segundos em uma única GPU.

Python
# train the network
# the training for 10 epochs takes about 60 second on a single GPU in this notebook
deepar_predictor = deepar_estimator.train(train_ds)

Gere e visualize previsões

Utilize o modelo treinado para prever os próximos 7 dias para cada série temporal. As visualizações mostram os valores previstos com intervalos de confiança de 90% e os valores reais.

Python
# predict
forecasts = deepar_predictor.predict(test_pairs.input, num_samples=20)

# ground truth
labels = [to_pandas(l) for l in test_pairs.label]

# visualize predictions
for i, forecast in enumerate(islice(forecasts, MAX_TS_TO_DISPLAY)):
plt.plot(labels[i][-NUM_WINDOWS*prediction_length:].to_timestamp())
forecast.plot(intervals=(0.9,), show_label=True)
plt.legend([f"Ground truth: {forecast.item_id}", "predicted median", "90% confidence interval"])
plt.show()

Avaliar o desempenho do modelo

Calcule as métricas de avaliação usando o GluonTS Evaluator. As métricas incluem MASE, RMSE e perdas quantílicas.

Python
# calculate evaluation metrics
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
agg_metrics, item_metrics = evaluator(
labels,
deepar_predictor.predict(test_pairs.input, num_samples=20),
num_series=len(test_pairs),
)

# metrics per time series
item_metrics.display()

# aggregated metrics
print(json.dumps(agg_metrics, indent=2))

Retomar o treinamento a partir do ponto de verificação.

Carregue um ponto de verificação salvo e continue o treinamento por mais épocas. Isso demonstra como retomar o treinamento a partir de um estado de modelo previamente salvo.

Configure o modelo para ensinar por mais 10 épocas, a partir do ponto de verificação salvo na época 9.

Python
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq": freq,
"prediction_length": prediction_length,
"context_length": 4 * prediction_length,
}

# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator": "auto",
"max_epochs": NUM_EPOCHS + 10, # Train for another 10 epochs
"callbacks": [checkpoint_cb],
}

# create a DeepAR estimator using the model checkpoint
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)

updated_predictor = deepar_estimator.train(
training_data=train_ds,
ckpt_path=f"{CHECKPOINT_PATH}/deepar-epoch=09-step=500.ckpt",
)

Próximos passos

Este notebook demonstrou os conceitos básicos de previsão de séries temporais com GluonTS em compute GPU serverless Databricks . Para saber mais:

Exemplo de caderno

Previsão de séries temporais com GluonTS

Abrir notebook em uma nova aba