#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""This module contains the class TemporalHierarchicalModel class.
"""
import logging
from math import gcd
from typing import Callable, Dict, List, Optional, Type
import numpy as np
import pandas as pd
from kats.consts import TimeSeriesData
from kats.models import (
arima,
holtwinters,
linear_model,
model,
prophet,
quadratic_model,
sarima,
theta,
)
from kats.models.model import Model
from kats.models.reconciliation.base_models import (
BaseTHModel,
calc_mape,
calc_mae,
GetAggregateTS,
)
from sklearn.covariance import MinCovDet
BASE_MODELS: Dict[str, Type[Model]] = {
"arima": arima.ARIMAModel,
"holtwinters": holtwinters.HoltWintersModel,
"sarima": sarima.SARIMAModel,
"prophet": prophet.ProphetModel,
"linear": linear_model.LinearModel,
"quadratic": quadratic_model.QuadraticModel,
"theta": theta.ThetaModel,
}
def _log_error(msg: str) -> ValueError:
logging.error(msg)
return ValueError(msg)
[docs]class TemporalHierarchicalModel:
"""Temporal hierarchical model class.
This framework combines the base models of different temporal aggregation
levels to generate reconciled forecasts.
This class provides fit, get_S, get_W, predict and median_validation.
Attributes:
data: A TimeSeriesData object storing the time series data for level 1
(i.e., the most disaggregate level).
baseModels: A list BaseTHModel objects representing the base models for
different levels.
"""
models: Optional[Dict[str, model.Model]] = None
residuals: Optional[Dict[int, np.ndarray]] = None
res_matrix: Optional[np.ndarray] = None
def __init__(self, data: TimeSeriesData, baseModels: List[BaseTHModel]) -> None:
if not data.is_univariate():
msg = f"Only univariate time series supported, but got {type(data.value)}."
raise _log_error(msg)
self.data = data
for basemodel in baseModels:
if not isinstance(basemodel, BaseTHModel):
msg = (
"Base model should be a BaseTHModel object but is "
f"{type(basemodel)}."
)
raise _log_error(msg)
levels = [bm.level for bm in baseModels]
if 1 not in levels:
raise _log_error("Model of level 1 is missing.")
if len(levels) != len(set(levels)):
raise _log_error("One level cannot receive multiple models.")
self.levels = sorted(levels, reverse=True)
m = self._get_m(levels)
self.m = m
self.freq = {k: int(m / k) for k in self.levels}
self.baseModels = baseModels
self.info_fcsts = {}
self.info_residuals = {}
def _get_m(self, ks: List[int]) -> int:
"""Calculate m.
m is the minimum common multiple of all levels.
Args:
ks: the list of integers representing all the levels.
Returns:
An integer representing the minimum common multiple.
"""
base = 1
for c in ks:
base = base * c // gcd(base, c)
return base
[docs] def fit(self) -> None:
"""Fit all base models.
If base model only has residuals and forecasts, store the information.
"""
levels = self.levels
TSs = GetAggregateTS(self.data).aggregate(levels)
models = {}
residuals = {}
fcsts = {}
for bm in self.baseModels:
model_name = bm.model_name
if model_name is None: # only residuals and fcsts are provided
models[bm.level] = None
residuals[bm.level] = bm.residuals
fcsts[bm.level] = bm.fcsts
else:
m = BASE_MODELS[model_name](
data=TSs[bm.level],
params=bm.model_params,
)
m.fit()
models[bm.level] = m
self.models = models
self.info_fcsts = fcsts
self.info_residuals = residuals
[docs] def get_S(self) -> np.ndarray:
"""Calculate S matrix.
Returns:
A np.array representing the S matrix.
"""
ans = []
levels = self.levels
m = self.m
for k in levels:
for i in range(self.freq[k]):
tem = np.zeros(m)
tem[(i * k) : (i * k + k)] = 1.0
ans.append(tem)
return np.row_stack(ans)
def _aggregate_data(self, data: np.ndarray, k: int) -> np.ndarray:
"""Aggregate data according to level k."""
if k == 1:
return data
n = len(data)
h = n // k
return (data[: int(h * k)]).reshape(-1, k).sum(axis=1)
def _get_residuals(self, model: model.Model) -> np.ndarray:
"""Calculate residuals of each base model.
Args:
model: a callable model object representing the trained base model.
Returns:
A np.ndarray of residuals.
"""
try:
# pyre-ignore[16]: `Model` has no attribute `model`.
return model.model.resid.values
except Exception:
fcst = model.predict(steps=1, freq="D", include_history=True)
merge = fcst.merge(model.data.to_dataframe(), on="time")
for col in merge.columns:
if col != "time" and ("fcst" not in col):
lab = col
return merge[lab].values - merge["fcst"].values
raise ValueError("Couldn't find residual or forecast values in model")
def _get_all_residuals(self) -> Dict[int, np.ndarray]:
"""
Calculate residuals for all base models.
Returns:
Dictionary for residuals, whose key is level and value is residual
array.
"""
residuals = getattr(self, "residuals", None)
# if residuals have not been calculated yet
if residuals is None:
levels = self.levels
models = self.models
residuals = {}
for k in levels:
assert models is not None
if models[k] is not None:
try:
vals = self._get_residuals(models[k])
except Exception as e:
msg = (
f"Failed to get residuals for level {k} with error "
f"message {e}."
)
raise _log_error(msg)
residuals[k] = vals
else:
residuals[k] = self.info_residuals[k]
self.residuals = residuals
return residuals
def _get_residual_matrix(self) -> np.ndarray:
"""
Reshape residuals into matrix format.
Returns:
Residual matrix.
"""
res_matrix = self.res_matrix
if res_matrix is None:
residuals = self._get_all_residuals()
ks = self.levels
freq = self.freq
h = np.min([len(residuals[k]) // freq[k] for k in ks])
res_matrix = []
for k in ks:
n = h * freq[k]
res_matrix.append(residuals[k][-n:].reshape(h, -1).T)
res_matrix = np.row_stack(res_matrix)
self.res_matrix = res_matrix
return res_matrix
[docs] def get_W(self, method: str = "struc", eps: float = 1e-5) -> np.ndarray:
"""
Calculate W matrix.
Args:
method: Reconciliation method for temporal hierarchical model. Valid
methods include 'struc', 'svar', 'hvar', 'mint_sample', and
'mint_shrink'.
eps: Epsilons added to W for numerical stability.
Returns:
W matrix. (If W is a diagnoal matrix, only returns its diagnoal elements).
"""
levels = self.levels
freq = self.freq
if method == "struc":
ans = []
for k in levels:
ans.extend([k] * freq[k])
return np.array(ans)
elif method == "svar":
residuals = self._get_all_residuals()
ans = []
for k in levels:
ans.extend([np.nanmean(np.square(residuals[k]))] * freq[k])
return np.array(ans) + eps
elif method == "hvar":
res_matrix = self._get_residual_matrix()
return np.nanvar(res_matrix, axis=1) + eps
elif method == "mint_shrink":
cov = np.cov(self._get_residual_matrix())
# get correlation matrix
sqrt = np.sqrt(np.diag(cov))
cor = (
(cov / sqrt).T
) / sqrt # due to symmetry, no need to transpose the matrix again.
mask = ~np.eye(cor.shape[0], dtype=bool)
cor = cor[mask]
lam = np.var(cor) / np.sum(cor ** 2)
lam = np.max([0, lam])
cov = np.diag(np.diag(cov)) * lam + (1.0 - lam) * cov
cov += np.eye(len(cov)) * eps
return cov
elif method == "mint_sample":
cov = np.cov(self._get_residual_matrix())
cov += np.eye(len(cov)) * eps
return cov
else:
raise _log_error(f"{method} is invalid for get_W() method.")
def _predict_origin(self, steps: int, method="struc") -> Dict[int, np.ndarray]:
"""
Generate original forecasts from each base model (without time index).
Args:
steps: Number of forecasts for level 1.
methd: Reconciliation method.
Returns:
Dictionary of forecasts of each level, whose key is level and value
is forecast array.
"""
m = self.m
levels = self.levels
freq = self.freq
h = int(np.ceil(steps / m))
hf = steps // m
orig_fcst = {}
models = self.models
# generate forecasts for each level
for k in levels:
assert models is not None
num = int(freq[k] * h)
if models[k] is not None:
orig_fcst[k] = models[k].predict(steps=num, freq="D")["fcst"].values
else:
fcst_num = len(self.info_fcsts[k])
if fcst_num < num:
if fcst_num >= hf * freq[k]:
# since the final output only needs hf*freq[k] forecasts
# for level k, we pad the forecast array to desirable
# length. (note that the padding values would be ignored
# in the final output.)
orig_fcst[k] = np.concatenate(
[
self.info_fcsts[k],
[self.info_fcsts[k][-1]] * (num - fcst_num),
]
)
elif method == "bu" and k != 1:
# for 'bu' only level 1 is needed.
orig_fcst[k] = self.info_fcsts[k]
else:
msg = (
f"{hf*freq[k]} steps of forecasts for level {k} are"
f" needed, but only receive {fcst_num} steps (and "
"forecast model is None)."
)
raise _log_error(msg)
else:
orig_fcst[k] = self.info_fcsts[k][:num]
return orig_fcst
def _predict(
self,
steps: int,
method="struc",
origin_fcst: bool = False,
fcst_levels: Optional[List[int]] = None,
) -> Dict[str, Dict[int, np.ndarray]]:
"""Generate forecasts for each level (without time index).
Args:
steps: Number of forecasts for level 1.
methd: Reconciliation method.
origin_fcst: Whether to return the forecasts of base models.
fcst_levels: Levels that one wants to generate forecasts for.
If None, then all forecasts for all levels of the base models
are generated.
Returns:
Dictionary of forecasts, whose key is level and value is
forecast array.
"""
if not hasattr(self, "models"):
raise _log_error("Please fit base models via .fit() first.")
m = self.m
levels = self.levels
freq = self.freq
h = int(np.ceil(steps / m))
if fcst_levels is None:
fcst_levels = list(levels)
fcst = {}
orig_fcst = self._predict_origin(steps, method)
if method == "bu":
# bottom_up method
yhat = orig_fcst[1]
elif method == "median":
# median method
tem = []
for k in levels:
tem.append(np.repeat(orig_fcst[k] / k, k))
tem = np.row_stack(tem)
yhat = np.median(tem, axis=0)
elif method in {"struc", "svar", "hvar", "mint_shrink", "mint_sample"}:
# transform fcsts into matrix
yh = []
for k in levels:
yh.append(orig_fcst[k].reshape(h, -1).T)
yh = np.row_stack(yh)
S = self.get_S()
W = self.get_W(method)
# when W is a vector, i.e., a simpler represent for a diagnoal matrix
if len(W.shape) == 1:
T = (S.T) / W
else:
T = np.linalg.solve(W, S).T
yhat = np.dot(S, np.linalg.solve(T.dot(S), T)).dot(yh)
# extract forecasts for level 1
yhat = (yhat[(-freq[1]) :, :].T).flatten()[:steps]
else:
raise _log_error(f"Reconciliation method {method} is invalid.")
# aggregate fcsts
for k in fcst_levels:
fcst[k] = self._aggregate_data(yhat, k)[: (steps // k)]
ans = {"fcst": fcst}
if origin_fcst:
for elm in orig_fcst:
orig_fcst[elm] = orig_fcst[elm][: (steps // elm)]
ans["origin_fcst"] = orig_fcst
return ans
[docs] def predict(
self,
steps: int,
method="struc",
freq: Optional[str] = None,
origin_fcst: bool = False,
fcst_levels: Optional[List[int]] = None,
last_timestamp: Optional[pd.Timestamp] = None,
) -> Dict[str, Dict[int, pd.DataFrame]]:
"""Generate reconciled forecasts (with time index).
Args:
steps: The number of forecasts needed for level 1.
methd: The name of the reconciliation method. Can be 'bu'
(bottom-up), 'median', 'struc' (structure-variance), 'svar',
'hvar', 'mint_shrink' or 'mint_sample'.
freq: The frequency of the time series at level 1. If None, then we
infer the frequency via ts.infer_freq_robust().
origin_fcst: Whether or not to return the forecasts of base models.
fcst_levels: The levels to generate forecasts for. Default is None,
which generates forecasts for all the levels of the base models.
Returns:
A dictionary of forecasts, whose key is the level and the
corresponding value is a np.array storing the forecasts.
"""
if freq is None:
freq = self.data.infer_freq_robust()
last_timestamp = self.data.time.max()
fcsts = self._predict(
steps, method=method, origin_fcst=origin_fcst, fcst_levels=fcst_levels
)
ans = {}
for elm in fcsts:
tmp = {}
for k in fcsts[elm]:
fcst_num = len(fcsts[elm][k])
time = pd.date_range(
last_timestamp + freq * k,
last_timestamp + freq * k * fcst_num,
periods=fcst_num,
)
tmp[k] = pd.DataFrame({"time": time, "fcst": fcsts[elm][k]})
ans[elm] = tmp
return ans