メインコンテンツまでスキップ

GluonTS による時系列予測

このノートブックでは、 Databricksサーバレス GPU コンピュートで確率的時系列予測にGluonTSを使用する方法を示します。 GluonTS は、時系列モデリングのためのディープラーニングベースのアプローチに重点を置いた Python ライブラリです。

GluonTS は、最先端のモデルの実装が事前に構築された、予測と異常検出のためのツールキットを提供します。PyTorchと MXNet の両方の実装をサポートしており、ニューラルネットワーク アーキテクチャ、機能処理、評価メトリクスなどの重要なコンポーネントが含まれています。

ノートブックには次の内容が記載されています。

  • 電力消費データの読み込みと準備
  • バックテスト用にトレーニングする/テスト分割を作成する
  • 予測のためのDeepARモデルのトレーニング
  • 信頼区間による予測の評価
  • モデルチェックポイントの保存と読み込み

サーバレスGPUコンピュートに接続する

[接続 ] ドロップダウンをクリックし、 [サーバレス GPU] を選択します。

GluonTSと依存関係をインストールする

PyTorch サポート付きの GluonTS ライブラリと、データセットをダウンロードするための wget をインストールします。

Python
# install gluonts package
%pip install -q --upgrade gluonts[torch] wget
Python
dbutils.library.restartPython()

モデルチェックポイント用のUnity Catalogストレージを構成する

モデルのチェックポイントを保存するためにUnity Catalogセットアップします。 チェックポイント パスはUnity Catalogボリュームを使用して、トレーニング中にモデルの状態を保持します。

Python
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_model_name", "custom_transformer")
dbutils.widgets.text("uc_volume", "checkpoints")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
MODEL_NAME = dbutils.widgets.get("uc_model_name")
CHECKPOINT_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{MODEL_NAME}"

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
print(f"CHECKPOINT_PATH: {CHECKPOINT_PATH}")
Python
# show the installed gluonts version
%pip show gluonts

GPUの可用性とリソースを確認する

GPUコンピュートが利用可能であることを確認し、ハードウェア仕様を表示します。

Python
# show the GPU details
!nvidia-smi
Python
import torch
import psutil

# check that there GPU is available on the notebook compute
assert torch.cuda.is_available(), 'You need to use GPU compute for this notebook'
# show GPU, GPU RAM, number of CPUs and total RAM
print(f"""
Number of GPUs available: {torch.cuda.device_count()}
Total GPU RAM: {torch.cuda.get_device_properties(0).total_memory / (1024 ** 3):.2f} GB
Number of CPUs: {psutil.cpu_count()}
Total RAM: {psutil.virtual_memory().total / (1024 ** 3):.2f} GB
""")

必要なライブラリをインポートする

データセットの処理、モデルのトレーニング、評価のために、標準のデータサイエンス ライブラリとともに GluonTS コンポーネントをインポートします。

Python
import os
import json
import zipfile
import random
import matplotlib.pyplot as plt
import wget
import tempfile
import numpy as np
import pandas as pd
import matplotlib.colors as mcolors
from itertools import islice, chain

# GluonTS
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split, OffsetSplitter, DateSplitter
from gluonts.dataset.util import to_pandas
from gluonts.dataset.common import ListDataset
from gluonts.dataset.jsonl import JsonLinesFile
from gluonts.evaluation import make_evaluation_predictions, Evaluator
from gluonts.model.predictor import Predictor
from gluonts.model.forecast import QuantileForecast
from gluonts.dataset.field_names import FieldName
from pathlib import Path
from gluonts.torch import (
DeepAREstimator, # RNN
TemporalFusionTransformerEstimator, # LTSM
WaveNetEstimator, # Dilated convolution,
SimpleFeedForwardEstimator, # MLP
DeepNPTSEstimator, # MLP
)
from gluonts.model.seasonal_naive import SeasonalNaivePredictor
from gluonts.ext.prophet import ProphetPredictor
from gluonts.model.npts import NPTSPredictor

import lightning.pytorch as pl
from lightning.pytorch.callbacks import ModelCheckpoint

# setup plt environment
plt.rcParams["axes.grid"] = True
plt.rcParams["figure.figsize"] = (20, 3)
colors = list(mcolors.TABLEAU_COLORS)

電力消費データセットを読み込む

このノートブックでは、カリフォルニア大学アーバイン校のリポジトリの電力消費データセットを使用します。このデータセットには、2011 年から 2014 年までの 370 社のクライアントからの電力消費量の測定値が含まれており、値は 15 分ごとに kW 単位で記録されています。

ダウンロードするデータセットのソース URL とファイル名を構成します。

Python
data_file_name = 'LD2011_2014.txt'
dataset_url = 'https://archive.ics.uci.edu/static/public/321/electricityloaddiagrams20112014.zip'

データセット ファイルは抽出すると約 800 MB となり、 Databricksサーバーレス ノートブックのワークスペース ファイルの制限である 500 MB を超えます。 次のコードは、一時ディレクトリを使用してデータをダウンロードおよび抽出し、それを Pandas DataFrame に読み込みます。

Python
# download and extract data
# the electricity dataset https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014 from the repository of the University of California, Irvine

with tempfile.TemporaryDirectory() as tmp_dir_name:
temp_zip = f'{tmp_dir_name}/ts.zip'
print(f'Downloading data zip file from: {dataset_url}')
wget.download(dataset_url, out=temp_zip)

with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
print(f'Extracting data to: {tmp_dir_name}')
data_file_path = zip_ref.extract(data_file_name, tmp_dir_name)
print(f'Zip extracted to: {data_file_path}')

print('Loading data into Pandas DataFrame')
df_raw = pd.read_csv(
data_file_path,
sep=';',
index_col=0,
decimal=',',
parse_dates=True,
)

生の電力消費データを 15 分間隔でプレビューします。

データを1時間間隔で再サンプリングする

データポイントの数を減らし、トレーニングを高速化するために、15 分から 1 時間の間隔でデータを再サンプリングします。

Python
# see the data
df_raw
Python
# resample to 1h intervals to reduce the number of data points
freq = "1h"
div = 4 # 1 hour contain 4x 15 min intervals, you need to delete the resampled value by 4
num_timeseries = df_raw.shape[1]
data_kw = df_raw.resample(freq).sum() / div

data_kw

予測を構成する

予測期間を 7 日間 (168 時間) に設定し、2014 年のデータを使用してトレーニングの日付範囲を定義します。

トレーニングを高速化するには、時系列のサブセットを選択します。370 個の時系列すべてをトレーニングするには、 USE_FULL_DATASET = Trueを設定します。

Python
# predict for 7 days
prediction_days = 7
# 24 hours per day
intervals_per_day = 24
prediction_length = prediction_days * intervals_per_day

# take the last year of data for a sample
start_training_date = pd.Timestamp('2014-01-01')
end_dataset_date = pd.Timestamp('2014-12-31')

print(f"Sampling frequency set to {freq}. Generate predictions for {prediction_length} intervals")
Python
USE_FULL_DATASET = False # By default use only a subset of the time series because training of full dataset can take longer time
SAMPLE_SIZE = 10 # set number of samples in the dataset if you don't use the full dataset
MAX_TS_TO_DISPLAY = 10

# get the full dataset or a random sample of SAMPLE_SIZE
# you can change the selection to include specific time series
# ts_sample = data_kw[['item_id1', 'item_id2']]
ts_sample = data_kw if USE_FULL_DATASET else data_kw[np.random.choice(data_kw.columns.to_list(), size=SAMPLE_SIZE, replace=False)]

データをGluonTS形式に変換する

Pandas DataFrame を GluonTS 形式に変換し、時系列を視覚化します。詳細な例については、GluonTSクイック スタートを参照してください。

Python
# convert to GluonTS format, taking only the data between start_training_date and end_dataset_date
ts_dataset = PandasDataset(
dict(ts_sample[(ts_sample.index > start_training_date) & (ts_sample.index <= end_dataset_date)].astype(np.float32))
)

# visualize time series in the GluonTS dataset
for i, entry in enumerate(islice(ts_dataset, MAX_TS_TO_DISPLAY)):
to_pandas(entry).plot(label=entry[FieldName.ITEM_ID], color=colors[i % len(colors)])
plt.legend()
plt.tight_layout()
plt.show()

print(f'The GluonTS dataset contains {len(ts_dataset)} individual time series from {start_training_date} to {end_dataset_date}')

バックテスト用にトレーニングする/テスト分割を作成する

ローリング ウィンドウを使用して、データセットをトレーニング セットとテスト セットに分割します。これにより、モデルのパフォーマンスをバックテストするための 4 つのテスト ウィンドウが作成されます。

Python
# set backtest parameters
NUM_WINDOWS = 4 # number of rolling windows for backtest
# distance between windows, set to:
# < prediction_length for overlapping windows
# = prediction length for adjucent windows
# > prediction_length for non overapping and non-adjucent windows
DISTANCE = prediction_length

# set the training-testing split date
end_training_date = pd.Period(end_dataset_date, freq=freq) - NUM_WINDOWS*prediction_length

# split into train and test datasets using GluonTS's DateSplitter
train_ds, test_template = DateSplitter(date=end_training_date).split(ts_dataset)
test_pairs = test_template.generate_instances(
prediction_length=prediction_length,
windows=NUM_WINDOWS,
distance=DISTANCE,
)

print(f"The dataset is splitted in {len(train_ds)} training datasets and {len(test_pairs)} test pairs. Training end is {end_training_date}")

DeepARモデルをトレーニングする

確率的予測のためのリカレント ニューラルネットワーク モデルである DeepAR 推定器をトレーニングします。 他のアルゴリズムについては、GluonTS ドキュメントの「利用可能なモデル」を参照してください。

DeepAR モデルのハイパーパラメータとトレーニング設定を構成します。モデルは予測長の 4 倍のコンテキスト長を使用し、各エポックの後にチェックポイントを保存します。

Python
NUM_EPOCHS = 10

os.makedirs(CHECKPOINT_PATH, exist_ok=True)

checkpoint_cb = ModelCheckpoint(
dirpath=CHECKPOINT_PATH,
filename="deepar-{epoch:02d}-{step}",
save_top_k=-1, # keep all checkpoints
every_n_epochs=1, # save after every epoch
save_on_train_epoch_end=True,
)

# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq":freq,
"prediction_length":prediction_length,
"context_length":4*prediction_length,
}

# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator":"auto",
"max_epochs":NUM_EPOCHS,
"callbacks":[checkpoint_cb]
}

# create a DeepAR estimator
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)

torch.set_float32_matmul_precision('medium') # 'high'

トレーニングデータセット上でDeepARモデルをトレーニングします。 1 つの GPU で 10 エポックのトレーニングを行うと、約 60 秒かかります。

Python
# train the network
# the training for 10 epochs takes about 60 second on a single GPU in this notebook
deepar_predictor = deepar_estimator.train(train_ds)

予測を生成して視覚化する

トレーニング済みのモデルを使用して、各時系列の今後 7 日間を予測します。視覚化では、90% の信頼区間と実際の値を含む予測値が表示されます。

Python
# predict
forecasts = deepar_predictor.predict(test_pairs.input, num_samples=20)

# ground truth
labels = [to_pandas(l) for l in test_pairs.label]

# visualize predictions
for i, forecast in enumerate(islice(forecasts, MAX_TS_TO_DISPLAY)):
plt.plot(labels[i][-NUM_WINDOWS*prediction_length:].to_timestamp())
forecast.plot(intervals=(0.9,), show_label=True)
plt.legend([f"Ground truth: {forecast.item_id}", "predicted median", "90% confidence interval"])
plt.show()

モデルのパフォーマンスを評価する

GluonTS Evaluatorを使用して評価メトリクスを計算します。 メトリクスには、MASE、RMSE、分位点損失が含まれます。

Python
# calculate evaluation metrics
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
agg_metrics, item_metrics = evaluator(
labels,
deepar_predictor.predict(test_pairs.input, num_samples=20),
num_series=len(test_pairs),
)

# metrics per time series
item_metrics.display()

# aggregated metrics
print(json.dumps(agg_metrics, indent=2))

チェックポイントからトレーニングを再開

保存したチェックポイントをロードし、追加のエポックのトレーニングを続行します。これは、以前に保存したモデル状態からトレーニングを再開する方法を示しています。

エポック 9 で保存されたチェックポイントからさらに 10 エポックをトレーニングするようにモデルを構成します。

Python
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq": freq,
"prediction_length": prediction_length,
"context_length": 4 * prediction_length,
}

# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator": "auto",
"max_epochs": NUM_EPOCHS + 10, # Train for another 10 epochs
"callbacks": [checkpoint_cb],
}

# create a DeepAR estimator using the model checkpoint
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)

updated_predictor = deepar_estimator.train(
training_data=train_ds,
ckpt_path=f"{CHECKPOINT_PATH}/deepar-epoch=09-step=500.ckpt",
)

次のステップ

このノートブックでは、 Databricksサーバレス GPU コンピュート上の GluonTS を使用した時系列予測の基本を説明しました。 詳細については、以下をご覧ください。

サンプルノートブック

GluonTS による時系列予測

ノートブックを新しいタブで開く