GluonTSを用いた時系列予測
このノートブックでは、 Databricksサーバレス GPU コンピュートで確率的時系列予測にGluonTSを使用する方法を示します。 GluonTS は、時系列モデリングのためのディープラーニング ベースのアプローチに焦点を当てたPythonライブラリです。
GluonTSは、最先端のモデルの事前構築済み実装を備えた、予測および異常検知のためのツールキットを提供します。PyTorchと MXNet の両方の実装をサポートしており、ニューラルネットワーク アーキテクチャ、機能処理、評価メトリクスなどの重要なコンポーネントが含まれています。
ノートブックの内容は次のとおりです。
- 電力消費データの読み込みと準備
- バックテスト用にトレーニングする/テスト分割を作成する
- 予測のためのDeepARモデルのトレーニング
- 信頼区間を用いた予測の評価
- モデルのチェックポイントの保存と読み込み
サーバレスGPUコンピュートに接続する
Connect ドロップダウンをクリックし、 サーバレスGPU を選択します。 [環境]サイドパネルを開いて、アクセラレータを1xA10に設定し、AI v5を選択します。
GluonTSと依存関係をインストールします。
PyTorchサポートを備えた GluonTS ライブラリとデータセットをダウンロードするための wget をインストールします。
# install gluonts package
%pip install -q --upgrade gluonts[torch] wget
dbutils.library.restartPython()
モデルのチェックポイント用にUnity Catalogストレージを設定する
モデルのチェックポイントを保存するためにUnity Catalogセットアップします。 チェックポイント パスはUnity Catalogボリュームを使用して、トレーニング中にモデルの状態を保持します。
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_model_name", "custom_transformer")
dbutils.widgets.text("uc_volume", "checkpoints")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
MODEL_NAME = dbutils.widgets.get("uc_model_name")
CHECKPOINT_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{MODEL_NAME}"
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
print(f"CHECKPOINT_PATH: {CHECKPOINT_PATH}")
# show the installed gluonts version
%pip show gluonts
GPUの可用性とリソースを確認する
GPUコンピュートが利用可能であることを確認し、ハードウェア仕様を表示します。
# show the GPU details
!nvidia-smi
import torch
import psutil
# check that GPU is available on the notebook compute
assert torch.cuda.is_available(), 'You need to use GPU compute for this notebook'
# show GPU, GPU RAM, number of CPUs and total RAM
print(f"""
Number of GPUs available: {torch.cuda.device_count()}
Total GPU RAM: {torch.cuda.get_device_properties(0).total_memory / (1024 ** 3):.2f} GB
Number of CPUs: {psutil.cpu_count()}
Total RAM: {psutil.virtual_memory().total / (1024 ** 3):.2f} GB
""")
必要なライブラリをインポートします
データセットの処理、モデルのトレーニング、評価に必要なGluonTSコンポーネントと、標準的なデータサイエンスライブラリをインポートします。
import os
import json
import zipfile
import matplotlib.pyplot as plt
import wget
import tempfile
import numpy as np
import pandas as pd
import matplotlib.colors as mcolors
from itertools import islice
# GluonTS
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import DateSplitter
from gluonts.dataset.util import to_pandas
from gluonts.evaluation import Evaluator
from gluonts.dataset.field_names import FieldName
from gluonts.torch import DeepAREstimator
from lightning.pytorch.callbacks import ModelCheckpoint
# setup plt environment
plt.rcParams["axes.grid"] = True
plt.rcParams["figure.figsize"] = (20, 3)
colors = list(mcolors.TABLEAU_COLORS)
電力消費データセットを読み込む
このノートブックは、カリフォルニア大学アーバイン校のリポジトリにある電力消費量データセットを使用しています。このデータセットには、2011年から2014年までの370の顧客からの電力消費量の測定値が含まれており、値は15分ごとにkW単位で記録されています。
ダウンロードするデータセット ソースの URL とファイル名を設定します。
data_file_name = 'LD2011_2014.txt'
dataset_url = 'https://archive.ics.uci.edu/static/public/321/electricityloaddiagrams20112014.zip'
データセット ファイルは抽出時に約 800 MB あり、 Databricksサーバーレス ノートブックのワークスペース ファイルの制限である 500 MB を超えています。 以下のコードは、一時ディレクトリを使用してデータをダウンロードおよび展開し、その後、Pandas DataFrameに読み込みます。
# download and extract data
# the electricity dataset https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014 from the repository of the University of California, Irvine
with tempfile.TemporaryDirectory() as tmp_dir_name:
temp_zip = f'{tmp_dir_name}/ts.zip'
print(f'Downloading data zip file from: {dataset_url}')
wget.download(dataset_url, out=temp_zip)
with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
print(f'Extracting data to: {tmp_dir_name}')
data_file_path = zip_ref.extract(data_file_name, tmp_dir_name)
print(f'Zip extracted to: {data_file_path}')
print('Loading data into Pandas DataFrame')
df_raw = pd.read_csv(
data_file_path,
sep=';',
index_col=0,
decimal=',',
parse_dates=True,
)
15分間隔で生の電力消費データをプレビューできます。
データを1時間間隔にリサンプリングする
データポイントの数を減らし、トレーニングを高速化するために、データを15分間隔から1時間間隔にリサンプリングします。
# see the data
df_raw
# resample to 1h intervals to reduce the number of data points
freq = "1h"
div = 4 # 1 hour contain 4x 15 min intervals, you need to delete the resampled value by 4
data_kw = df_raw.resample(freq).sum() / div
data_kw
予測を構成する
予測期間を7日間(168時間)に設定し、2014年のデータを使用してトレーニング期間を定義します。
トレーニングを高速化するには、時系列のサブセットを選択します。 370 個の時系列すべてでUSE_FULL_DATASET = Trueトレーニングするに設定します。
# predict for 7 days
prediction_days = 7
# 24 hours per day
intervals_per_day = 24
prediction_length = prediction_days * intervals_per_day
# take the last year of data for a sample
start_training_date = pd.Timestamp('2014-01-01')
end_dataset_date = pd.Timestamp('2014-12-31')
print(f"Sampling frequency set to {freq}. Generate predictions for {prediction_length} intervals")
USE_FULL_DATASET = False # By default use only a subset of the time series because training of full dataset can take longer time
SAMPLE_SIZE = 10 # set number of samples in the dataset if you don't use the full dataset
MAX_TS_TO_DISPLAY = 10
# get the full dataset or a random sample of SAMPLE_SIZE
# you can change the selection to include specific time series
# ts_sample = data_kw[['item_id1', 'item_id2']]
ts_sample = data_kw if USE_FULL_DATASET else data_kw[np.random.choice(data_kw.columns.to_list(), size=SAMPLE_SIZE, replace=False)]
データをGluonTS形式に変換する
Pandas DataFrameをGluonTS形式に変換し、時系列データを可視化します。その他の例については、GluonTSクイックスタートガイドを参照してください。
# convert to GluonTS format, taking only the data between start_training_date and end_dataset_date
ts_dataset = PandasDataset(
dict(ts_sample[(ts_sample.index > start_training_date) & (ts_sample.index <= end_dataset_date)].astype(np.float32))
)
# visualize time series in the GluonTS dataset
for i, entry in enumerate(islice(ts_dataset, MAX_TS_TO_DISPLAY)):
to_pandas(entry).plot(label=entry[FieldName.ITEM_ID], color=colors[i % len(colors)])
plt.legend()
plt.tight_layout()
plt.show()
print(f'The GluonTS dataset contains {len(ts_dataset)} individual time series from {start_training_date} to {end_dataset_date}')
バックテスト用にトレーニングする/テスト分割を作成する
ローリングウィンドウを使用して、データセットをトレーニングセットとテストセットに分割します。これにより、モデルのパフォーマンスをバックテストするための4つのテストウィンドウが作成されます。
# set backtest parameters
NUM_WINDOWS = 4 # number of rolling windows for backtest
# distance between windows, set to:
# < prediction_length for overlapping windows
# = prediction length for adjucent windows
# > prediction_length for non overapping and non-adjucent windows
DISTANCE = prediction_length
# set the training-testing split date
end_training_date = pd.Period(end_dataset_date, freq=freq) - NUM_WINDOWS*prediction_length
# split into train and test datasets using GluonTS's DateSplitter
train_ds, test_template = DateSplitter(date=end_training_date).split(ts_dataset)
test_pairs = test_template.generate_instances(
prediction_length=prediction_length,
windows=NUM_WINDOWS,
distance=DISTANCE,
)
print(f"The dataset is splitted in {len(train_ds)} training datasets and {len(test_pairs)} test pairs. Training end is {end_training_date}")
DeepARモデルをトレーニングする
確率的予測のためのリカレント ニューラルネットワーク モデルである DeepAR 推定器をトレーニングします。 その他のアルゴリズムについては、GluonTSのドキュメントにある「利用可能なモデル」を参照してください。
DeepARモデルのハイパーパラメータとトレーニング設定を構成します。このモデルは、予測長の4倍のコンテキスト長を使用し、各エポック後にチェックポイントを保存します。
NUM_EPOCHS = 10
os.makedirs(CHECKPOINT_PATH, exist_ok=True)
checkpoint_cb = ModelCheckpoint(
dirpath=CHECKPOINT_PATH,
filename="deepar-{epoch:02d}-{step}",
save_top_k=-1, # keep all checkpoints
every_n_epochs=1, # save after every epoch
save_on_train_epoch_end=True,
)
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq":freq,
"prediction_length":prediction_length,
"context_length":4*prediction_length,
}
# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator":"auto",
"max_epochs":NUM_EPOCHS,
"callbacks":[checkpoint_cb]
}
# create a DeepAR estimator
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)
トレーニングデータセット上でDeepARモデルをトレーニングします。 10 エポックのトレーニングには、単一の GPU で約 60 秒かかります。
# Suppress known compatibility warnings
import warnings
warnings.filterwarnings("ignore", message="Using a non-tuple sequence for multidimensional indexing")
torch.set_float32_matmul_precision('high')
# train the network
# the training for 10 epochs takes about 60 second on a single GPU in this notebook
deepar_predictor = deepar_estimator.train(train_ds)
予測を生成し、視覚化する
学習済みモデルを使用して、各時系列データについて今後7日間の予測を行います。視覚化では、予測値と90%信頼区間、および真値が表示されます。
# predict
forecasts = deepar_predictor.predict(test_pairs.input, num_samples=20)
# ground truth
labels = [to_pandas(l) for l in test_pairs.label]
# visualize predictions
for i, forecast in enumerate(islice(forecasts, MAX_TS_TO_DISPLAY)):
plt.plot(labels[i][-NUM_WINDOWS*prediction_length:].to_timestamp())
forecast.plot(intervals=(0.9,), show_label=True)
plt.legend([f"Ground truth: {forecast.item_id}", "predicted median", "90% confidence interval"])
plt.show()
モデルのパフォーマンスを評価する
GluonTS Evaluatorを使用して評価メトリクスを計算します。 メトリクスには、MASE、RMSE、分位点損失が含まれます。
# calculate evaluation metrics
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
agg_metrics, item_metrics = evaluator(
labels,
deepar_predictor.predict(test_pairs.input, num_samples=20),
num_series=len(test_pairs),
)
# metrics per time series
item_metrics.display()
# aggregated metrics
print(json.dumps(agg_metrics, indent=2))
チェックポイントからトレーニングを再開
保存したチェックポイントを読み込み、追加のエポック分のトレーニングを続行します。これは、以前に保存したモデルの状態からトレーニングを再開する方法を示しています。
エポック 9 に保存されたチェックポイントから開始して、さらに 10 エポックの間トレーニングするようにモデルを構成します。
# set required model hyperparameters. See GluonTS repository for the full list of hyperparameters
model_hyperparameters = {
"freq": freq,
"prediction_length": prediction_length,
"context_length": 4 * prediction_length,
}
# set required trainer hyperparameters
trainer_hyperparameters = {
"accelerator": "auto",
"max_epochs": NUM_EPOCHS + 10, # Train for another 10 epochs
"callbacks": [checkpoint_cb],
}
# create a DeepAR estimator using the model checkpoint
deepar_estimator = DeepAREstimator(
**model_hyperparameters,
trainer_kwargs=trainer_hyperparameters,
)
updated_predictor = deepar_estimator.train(
training_data=train_ds,
ckpt_path=f"{CHECKPOINT_PATH}/deepar-epoch=09-step=500.ckpt",
)
次のステップ
このノートブックでは、 Databricksサーバレス GPU コンピュート上の GluonTS を使用した時系列予測の基本を説明しました。 詳細はこちらをご覧ください:
- GluonTS拡張チュートリアル- 高度な予測例
- GluonTSで利用可能なモデル- 事前構築済みモデルの完全なリスト
- Databricksサーバレス GPU コンピュートのベスト プラクティス- 最適化のヒント
- サーバレス GPU コンピュートのトラブルシューティング- よくある問題と解決策