GluonTSを用いた時系列予測
このノートブックでは、 Databricksサーバレス GPU コンピュートで確率的時系列予測にGluonTSを使用する方法を示します。 GluonTS は、時系列モデリングのためのディープラーニング ベースのアプローチに焦点を当てたPythonライブラリです。
GluonTSは、最先端のモデルの事前構築済み実装を備えた、予測および異常検知のためのツールキットを提供します。PyTorchと MXNet の両方の実装をサポートしており、ニューラルネットワーク アーキテクチャ、機能処理、評価メトリクスなどの重要なコンポーネントが含まれています。
ノートブックの内容は次のとおりです。
- 電力消費データの読み込みと準備
- バックテスト用にトレーニングする/テスト分割を作成する
- 予測のためのDeepARモデルのトレーニング
- 信頼区間を用いた予測の評価
- モデルのチェックポイントの保存と読み込み
サーバレスGPUコンピュートに接続する
[接続 ] ドロップダウンをクリックし、 [サーバレス GPU] を選択します。
GluonTSと依存関係をインストールします。
PyTorchサポートを備えた GluonTS ライブラリとデータセットをダウンロードするための wget をインストールします。
# install gluonts package
%pip install -q --upgrade gluonts[torch] wget
dbutils.library.restartPython()
モデルのチェックポイント用にUnity Catalogストレージを設定する
モデルのチェックポイントを保存するためにUnity Catalogセットアップします。 チェックポイント パスはUnity Catalogボリュームを使用して、トレーニング中にモデルの状態を保持します。
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_model_name", "custom_transformer")
dbutils.widgets.text("uc_volume", "checkpoints")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
MODEL_NAME = dbutils.widgets.get("uc_model_name")
CHECKPOINT_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{MODEL_NAME}"
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
print(f"CHECKPOINT_PATH: {CHECKPOINT_PATH}")
# show the installed gluonts version
%pip show gluonts
GPUの可用性とリソースを確認する
GPUコンピュートが利用可能であることを確認し、ハードウェア仕様を表示します。
# show the GPU details
!nvidia-smi
import torch
import psutil
# check that there GPU is available on the notebook compute
assert torch.cuda.is_available(), 'You need to use GPU compute for this notebook'
# show GPU, GPU RAM, number of CPUs and total RAM
print(f"""
Number of GPUs available: {torch.cuda.device_count()}
Total GPU RAM: {torch.cuda.get_device_properties(0).total_memory / (1024 ** 3):.2f} GB
Number of CPUs: {psutil.cpu_count()}
Total RAM: {psutil.virtual_memory().total / (1024 ** 3):.2f} GB
""")
必要なライブラリをインポートします
データセットの処理、モデルのトレーニング、評価に必要なGluonTSコンポーネントと、標準的なデータサイエンスライブラリをインポートします。
import os
import json
import zipfile
import random
import matplotlib.pyplot as plt
import wget
import tempfile
import numpy as np
import pandas as pd
import matplotlib.colors as mcolors
from itertools import islice, chain
# GluonTS
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split, OffsetSplitter, DateSplitter
from gluonts.dataset.util import to_pandas
from gluonts.dataset.common import ListDataset
from gluonts.dataset.jsonl import JsonLinesFile
from gluonts.evaluation import make_evaluation_predictions, Evaluator
from gluonts.model.predictor import Predictor
from gluonts.model.forecast import QuantileForecast
from gluonts.dataset.field_names import FieldName
from pathlib import Path
from gluonts.torch import (
DeepAREstimator, # RNN
TemporalFusionTransformerEstimator, # LTSM
WaveNetEstimator, # Dilated convolution,
SimpleFeedForwardEstimator, # MLP
DeepNPTSEstimator, # MLP
)
from gluonts.model.seasonal_naive import SeasonalNaivePredictor
from gluonts.ext.prophet import ProphetPredictor
from gluonts.model.npts import NPTSPredictor
import lightning.pytorch as pl
from lightning.pytorch.callbacks import ModelCheckpoint
# setup plt environment
plt.rcParams["axes.grid"] = True
plt.rcParams["figure.figsize"] = (20, 3)
colors = list(mcolors.TABLEAU_COLORS)
電力消費データセットを読み込む
このノートブックは、カリフォルニア大学アーバイン校のリポジトリにある電力消費量データセットを使用しています。このデータセットには、2011年から2014年までの370の顧客からの電力消費量の測定値が含まれており、値は15分ごとにkW単位で記録されています。
ダウンロードするデータセット ソースの URL とファイル名を設定します。
data_file_name = 'LD2011_2014.txt'
dataset_url = 'https://archive.ics.uci.edu/static/public/321/electricityloaddiagrams20112014.zip'
データセット ファイルは抽出時に約 800 MB あり、 Databricksサーバーレス ノートブックのワークスペース ファイルの制限である 500 MB を超えています。 以下のコードは、一時ディレクトリを使用してデータをダウンロードおよび展開し、その後、Pandas DataFrameに読み込みます。
# download and extract data
# the electricity dataset https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014 from the repository of the University of California, Irvine
with tempfile.TemporaryDirectory() as tmp_dir_name:
temp_zip = f'{tmp_dir_name}/ts.zip'
print(f'Downloading data zip file from: {dataset_url}')
wget.download(dataset_url, out=temp_zip)
with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
print(f'Extracting data to: {tmp_dir_name}')
data_file_path = zip_ref.extract(data_file_name, tmp_dir_name)
print(f'Zip extracted to: {data_file_path}')
print('Loading data into Pandas DataFrame')
df_raw = pd.read_csv(
data_file_path,
sep=';',
index_col=0,
decimal=',',
parse_dates=True,
)
15分間隔で生の電力消費データをプレビューできます。
データを1時間間隔にリサンプリングする
データポイントの数を減らし、トレーニングを高速化するために、データを15分間隔から1時間間隔にリサンプリングします。
# see the data
df_raw
# resample to 1h intervals to reduce the number of data points
freq = "1h"
div = 4 # 1 hour contain 4x 15 min intervals, you need to delete the resampled value by 4
num_timeseries = df_raw.shape[1]
data_kw = df_raw.resample(freq).sum() / div
data_kw
予測を構成する
予測期間を7日間(168時間)に設定し、2014年のデータを使用してトレーニング期間を定義します。
トレーニングを高速化するには、時系列のサブセットを選択します。 370 個の時系列すべてでUSE_FULL_DATASET = Trueトレーニングするに設定します。
# predict for 7 days
prediction_days = 7
# 24 hours per day
intervals_per_day = 24
prediction_length = prediction_days * intervals_per_day
# take the last year of data for a sample
start_training_date = pd.Timestamp('2014-01-01')
end_dataset_date = pd.Timestamp('2014-12-31')
print(f"Sampling frequency set to {freq}. Generate predictions for {prediction_length} intervals")
USE_FULL_DATASET = False # By default use only a subset of the time series because training of full dataset can take longer time
SAMPLE_SIZE = 10 # set number of samples in the dataset if you don't use the full dataset
MAX_TS_TO_DISPLAY = 10
# get the full dataset or a random sample of SAMPLE_SIZE
# you can change the selection to include specific time series
# ts_sample = data_kw[['item_id1', 'item_id2']]
ts_sample = data_kw if USE_FULL_DATASET else data_kw[np.random.choice(data_kw.columns.to_list(), size=SAMPLE_SIZE, replace=False)]
データをGluonTS形式に変換する
Pandas DataFrameをGluonTS形式に変換し、時系列データを可視化します。その他の例については、GluonTSクイックスタートガイドを参照してください。
# convert to GluonTS format, taking only the data between start_training_date and end_dataset_date
ts_dataset = PandasDataset(
dict(ts_sample[(ts_sample.index > start_training_date) & (ts_sample.index <= end_dataset_date)].astype(np.float32))
)
# visualize time series in the GluonTS dataset
for i, entry in enumerate(islice(ts_dataset, MAX_TS_TO_DISPLAY)):
to_pandas(entry).plot(label=entry[FieldName.ITEM_ID], color=colors[i % len(colors)])
plt.legend()
plt.tight_layout()
plt.show()
print(f'The GluonTS dataset contains {len(ts_dataset)} individual time series from {start_training_date} to {end_dataset_date}')
バックテスト用にトレーニングする/テスト分割を作成する
ローリングウィンドウを使用して、データセットをトレーニングセットとテストセットに分割します。これにより、モデルのパフォーマンスをバックテストするための4つのテストウィンドウが作成されます。
# set backtest parameters
NUM_WINDOWS = 4 # number of rolling windows for backtest
# distance between windows, set to:
# < prediction_length for overlapping windows
# = prediction length for adjucent windows
# > prediction_length for non overapping and non-adjucent windows
DISTANCE = prediction_length
# set the training-testing split date
end_training_date = pd.Period(end_dataset_date, freq=freq) - NUM_WINDOWS*prediction_length
# split into train and test datasets using GluonTS's DateSplitter
train_ds, test_template = DateSplitter(date=end_training_date).split(ts_dataset)
test_pairs = test_template.generate_instances(
prediction_length=prediction_length,
windows=NUM_WINDOWS,
distance=DISTANCE,
)
print(f"The dataset is splitted in {len(train_ds)} training datasets and {len(test_pairs)} test pairs. Training end is {end_training_date}")
DeepARモデルをトレーニングする
確率的予測のためのリカレント ニューラルネットワーク モデルである DeepAR 推定器をトレーニングします。 その他のアルゴリズムについては、GluonTSのドキュメントにある「利用可能なモデル」を参照してください。
DeepARモデルのハイパーパラメータとトレーニング設定を構成します。このモデルは、予測長の4倍のコンテキスト長を使用し、各エポック後にチェックポイントを保存します。
NUM_EPOCHS = 10
os.makedirs(CHECKPOINT_PATH, exist_ok=True)
checkpoint_cb = ModelCheckpoint(
dirpath=CHECKPOINT_PATH,
filename="deepar-{epoch:02d}-{step}",
save_top_k=-1, # keep all checkpoints
every_n_epochs=1, # save after every epoch
save_on_train_epoch_end=True,
)
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq":freq,
"prediction_length":prediction_length,
"context_length":4*prediction_length,
}
# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator":"auto",
"max_epochs":NUM_EPOCHS,
"callbacks":[checkpoint_cb]
}
# create a DeepAR estimator
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)
torch.set_float32_matmul_precision('medium') # 'high'
トレーニングデータセット上でDeepARモデルをトレーニングします。 10 エポックのトレーニングには、単一の GPU で約 60 秒かかります。
# train the network
# the training for 10 epochs takes about 60 second on a single GPU in this notebook
deepar_predictor = deepar_estimator.train(train_ds)
予測を生成し、視覚化する
学習済みモデルを使用して、各時系列データについて今後7日間の予測を行います。視覚化では、予測値と90%信頼区間、および真値が表示されます。
# predict
forecasts = deepar_predictor.predict(test_pairs.input, num_samples=20)
# ground truth
labels = [to_pandas(l) for l in test_pairs.label]
# visualize predictions
for i, forecast in enumerate(islice(forecasts, MAX_TS_TO_DISPLAY)):
plt.plot(labels[i][-NUM_WINDOWS*prediction_length:].to_timestamp())
forecast.plot(intervals=(0.9,), show_label=True)
plt.legend([f"Ground truth: {forecast.item_id}", "predicted median", "90% confidence interval"])
plt.show()
モデルのパフォーマンスを評価する
GluonTS Evaluatorを使用して評価メトリクスを計算します。 メトリクスには、MASE、RMSE、分位点損失が含まれます。
# calculate evaluation metrics
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
agg_metrics, item_metrics = evaluator(
labels,
deepar_predictor.predict(test_pairs.input, num_samples=20),
num_series=len(test_pairs),
)
# metrics per time series
item_metrics.display()
# aggregated metrics
print(json.dumps(agg_metrics, indent=2))
チェックポイントからトレーニングを再開
保存したチェックポイントを読み込み、追加のエポック分のトレーニングを続行します。これは、以前に保存したモデルの状態からトレーニングを再開する方法を示しています。
エポック 9 に保存されたチェックポイントから開始して、さらに 10 エポックの間トレーニングするようにモデルを構成します。
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq": freq,
"prediction_length": prediction_length,
"context_length": 4 * prediction_length,
}
# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator": "auto",
"max_epochs": NUM_EPOCHS + 10, # Train for another 10 epochs
"callbacks": [checkpoint_cb],
}
# create a DeepAR estimator using the model checkpoint
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)
updated_predictor = deepar_estimator.train(
training_data=train_ds,
ckpt_path=f"{CHECKPOINT_PATH}/deepar-epoch=09-step=500.ckpt",
)
次のステップ
このノートブックでは、 Databricksサーバレス GPU コンピュート上の GluonTS を使用した時系列予測の基本を説明しました。 詳細はこちらをご覧ください:
- GluonTS拡張チュートリアル- 高度な予測例
- GluonTSで利用可能なモデル- 事前構築済みモデルの完全なリスト
- Databricksサーバレス GPU コンピュートのベスト プラクティス- 最適化のヒント
- サーバレス GPU コンピュートのトラブルシューティング- よくある問題と解決策