Source code for kats.models.reconciliation.base_models
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
This module contains 1) helper functions for evaluating forecasting models (i.e., calc_mape and calc_mae); 2) the BaseTHModel class for storing information of base models;
and 3) the GetAggregateTS class for aggregating base time series to higher levels.
"""
from typing import List, Dict, Optional
from kats.consts import TimeSeriesData
from kats.models import (
arima,
holtwinters,
linear_model,
prophet,
quadratic_model,
sarima,
theta,
)
BASE_MODELS = {
"arima": arima.ARIMAModel,
"holtwinters": holtwinters.HoltWintersModel,
"sarima": sarima.SARIMAModel,
"prophet": prophet.ProphetModel,
"linear": linear_model.LinearModel,
"quadratic": quadratic_model.QuadraticModel,
"theta": theta.ThetaModel,
}
import logging
import numpy as np
import pandas as pd
[docs]def calc_mape(predictions: np.ndarray, truth: np.ndarray) -> float:
"""Calculate mape.
MAPE = average(abs((truth-predictions)/truth))
Args:
predictions: a np.array storing predictions.
truth: a np.array storing true values.
Returns:
A float representing the MAPE.
"""
base = np.abs((truth - predictions) / truth)
# filter out np.inf
base = base[base < np.inf]
return np.mean(base)
[docs]def calc_mae(predictions: np.ndarray, truth: np.ndarray) -> float:
"""Calculate mae.
MAE = average(abs(truth-predictions))
Args:
predictions: a np.array storing predictions.
truth: a np.array storing true values.
Returns:
A float representing the MAE.
"""
return np.average(np.abs(truth - predictions))
[docs]class BaseTHModel:
"""Base class for temporal hierarhical models.
The object stores the information of base model. We allow users to pass model info (i.e., model_name and model_params),
or to pass residuals and forecasts of a trained model directly.
Attributes:
level: An integer representing the level of the base model, should be a positive integer.
model_name: Optional; A string representing the name of forecast model; Default is None.
model_params: Optional; A parameter object storing the parameters of forecasting model; Default is None.
residuals: Optional; A np.array of residuals of forecasting model, which is necessary if both model_name and model_params are None. Default is None.
fcsts: Optional; A np.array of forecasts generated by the forecasting model, which is necessary if both model_name and model_params are None. Default is None.
"""
def __init__(
self,
level: int,
model_name: Optional[str] = None,
model_params: Optional[object] = None,
residuals: Optional[np.ndarray] = None,
fcsts: Optional[np.ndarray] = None,
) -> None:
if not isinstance(level, int) or level < 1:
msg = f"Level should be a positive integer but receive {level}."
logging.error(msg)
raise ValueError(msg)
if (residuals is None or residuals.size == 0) or (
fcsts is None or fcsts.size == 0
):
# when residuals or fcsts are missing
if (not model_name) or (not model_params):
msg = "model_name and model_params are needed when residuals or fcsts is missing."
logging.error(msg)
raise ValueError(msg)
if model_name not in BASE_MODELS:
msg = f"model_name {model_name} is not supported!"
logging.error(msg)
raise ValueError(msg)
self.level = level
self.model_name = model_name
self.model_params = model_params
self.residuals = residuals
self.fcsts = fcsts
def __str__(self):
return "BaseTHModel"
[docs]class GetAggregateTS:
"""Class for aggregating time series to different levels.
This class provides aggregate.
Attributes:
data: A TimeSeriesData object representing the time series to be aggregated.
"""
def __init__(self, data: TimeSeriesData) -> None:
if not data.is_univariate():
msg = f"Only support univariate time series, but get {type(data.value)}."
logging.error(msg)
raise ValueError(msg)
self.data = TimeSeriesData(data.to_dataframe().copy())
def _aggregate_single(self, ts, k):
if k == 1:
return ts
if k > len(ts):
msg = f"Level {k} should be less than the length of training time series (len(TS)={len(ts)})!"
logging.error(msg)
raise ValueError(msg)
n = len(ts)
m = (n // k) * k
value = pd.Series(ts.value.values[-m:].reshape(-1, k).sum(axis=1))
time = pd.Series(ts.time.values[(n - m + k - 1) : n : k])
return TimeSeriesData(time=time, value=value)
[docs] def aggregate(self, levels: List[int]) -> Dict[int, TimeSeriesData]:
"""Function for aggregating time series.
Args:
levels: A list of integers representing the levels which the time series to be aggregated for.
Returns:
A dictionary of aggregated time series for each level.
"""
if not isinstance(levels, list):
msg = f"Parameter 'levels' should be a list but receive {type(levels)}."
logging.error(msg)
raise ValueError(msg)
for k in levels:
if not isinstance(k, int) or k < 1:
msg = f"Level should be a positive int, but receive {k}."
logging.error(msg)
raise ValueError(msg)
return {k: self._aggregate_single(self.data, k) for k in levels}
def __str__(self):
return "GetAggregateTS"