Source code for kats.models.ensemble.kats_ensemble

#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""Kats ensemble model

Implementation of the Kats ensemble model. It starts from seasonality detection, if seasonality detected, it
continues to perform STL decomposition, then fit forecasting models on de-seasonalized components and aggregate;
otherwise it simiply leverage individual forecasting models and ensembling. We provided two ensembling methods,
weighted average and median ensembling.
"""

from __future__ import annotations
from copy import copy
import logging
import sys
import math
import pandas as pd
import numpy as np
import multiprocessing
from typing import Any, Callable, Dict, Optional, Sequence, Tuple, Union
from multiprocessing import cpu_count

import kats.models.model as mm
from kats.consts import TimeSeriesData, Params
from kats.models.ensemble.ensemble import EnsembleParams
from kats.models.model import Model
from kats.models import (
    arima,
    holtwinters,
    linear_model,
    prophet,
    quadratic_model,
    sarima,
    theta,
)
from kats.utils.backtesters import BackTesterSimple

# Seasonality detector
from kats.detectors.seasonality import ACFDetector

# STL decomposition
from kats.utils.decomposition import TimeSeriesDecomposition

# from numpy.typing import ArrayLike
ArrayLike = Union[Sequence[float], np.ndarray]

# models that can fit de_seasonal component
MODELS = {
    "arima": arima.ARIMAModel,
    "holtwinters": holtwinters.HoltWintersModel,
    "sarima": sarima.SARIMAModel,
    "prophet": prophet.ProphetModel,
    "linear": linear_model.LinearModel,
    "quadratic": quadratic_model.QuadraticModel,
    "theta": theta.ThetaModel,
}

# models that can fit seasonal time series data
SMODELS = {
    "prophet": prophet.ProphetModel,
    "theta": theta.ThetaModel,
    # "sarima": sarima.SARIMAModel,
}


def _logged_error(msg: str) -> ValueError:
    """Log and raise an error."""
    logging.error(msg)
    return ValueError(msg)


[docs]class KatsEnsemble: """Decomposition based ensemble model in Kats This is the holistic ensembling class based on decomposition when seasonality presents """ seasonality: bool = False sea_data: Optional[TimeSeriesData] = None desea_data: Optional[TimeSeriesData] = None steps: int = -1 decomposition_method: str = "" model_params: Optional[EnsembleParams] = None fitted: Optional[Dict[Any, Any]] = None weights: Optional[Dict[str, float]] = None predicted: Optional[Dict[str, pd.DataFrame]] = None err: Optional[Dict[str, float]] = None dates: Optional[pd.DatetimeIndex] = None fcst_dates: Optional[ArrayLike] = None fcst_df: Optional[pd.DataFrame] = None errors: Optional[Dict[Any, Any]] = None def __init__( self, data: TimeSeriesData, params: Dict[str, Any], ) -> None: self.data = data self.freq = pd.infer_freq(data.time) self.params = params self.validate_params() def validate_params(self): # validate aggregation method if self.params["aggregation"] not in ("median", "weightedavg"): method = self.params["aggregation"] msg = f"Only support `median` or `weightedavg` ensemble, but got {method}." raise _logged_error(msg) # validate decomposition method if self.params["decomposition_method"] in ("additive", "multiplicative"): self.decomposition_method = self.params["decomposition_method"] else: logging.info("Invalid decomposition method setting specified") logging.info("Defaulting to Additive Decomposition") self.decomposition_method = "additive" # validate m if (self.params["seasonality_length"] is not None) and\ (self.params["seasonality_length"] > int(len(self.data.time) // 2)): msg = "seasonality_length value cannot be larger than" "1/2 of the length of give time series" raise _logged_error(msg) # check customized forecastExecutor if ("forecastExecutor" in self.params.keys()) and\ (self.params["forecastExecutor"] is not None): msg = "Using customized forecastExecutor from given parameters" logging.info(msg) self.forecastExecutor = self.params["forecastExecutor"] # check customized fitExecutor if ("fitExecutor" in self.params.keys()) and\ (self.params["fitExecutor"] is not None): msg = "Using customized fitExecutor from given parameters" logging.info(msg) self.fitExecutor = self.params["fitExecutor"]
[docs] @staticmethod def seasonality_detector(data) -> bool: """Detect seasonalities from given TimeSeriesData Args: data: :class:`kats.consts.TimeSeriesData`, the input `TimeSeriesData` Returns: Flag for the presence of seasonality """ detector = ACFDetector(data) detector.detector() seasonality = detector.seasonality_detected return seasonality
[docs] @staticmethod def deseasonalize(data: TimeSeriesData, decomposition_method: str) -> Tuple[TimeSeriesData, TimeSeriesData]: """STL decomposition to given TimeSeriesData Static method to perform decomposition on the input data Args: data: :class:`kats.consts.TimeSeriesData`, input time series data decomposition_method: the specific method for decomposition Returns: Tuple of seasonal data and de-seasonalized data """ # create decomposer for time series decomposition decomposer = TimeSeriesDecomposition(data, decomposition_method) decomp = decomposer.decomposer() sea_data = copy(decomp["seasonal"]) desea_data = copy(data) if decomposition_method == "additive": desea_data.value = desea_data.value\ - decomp["seasonal"].value else: desea_data.value = desea_data.value\ / decomp["seasonal"].value return sea_data, desea_data
[docs] @staticmethod def reseasonalize( sea_data: TimeSeriesData, desea_predict: Dict[str, pd.DataFrame], decomposition_method: str, seasonality_length: int, steps: int, ) -> Dict[str, pd.DataFrame]: """Re-seasonalize the time series data Static method to re-seasonalize the input data Args: sea_data: :class:`kats.consts.TimeSeriesData`, the seasonal data from deseasonalize method desea_predict: dict of forecasted results for the deseasonalized data for each individual forecasting method decomposition_method: the specific method for decomposition seasonality_lenth: the length of seasonality steps: the length of forecasting horizon Returns: Dict of re-seasonalized data for each individual forecasting model """ rep = math.trunc(1 + steps / seasonality_length) seasonality_unit = sea_data.value[-seasonality_length:] predicted = {} for model_name, desea_pred in desea_predict.items(): if decomposition_method == "additive": if ( "fcst_lower" in desea_pred.columns and "fcst_upper" in desea_pred.columns ): # check consistency of time being index if "time" in desea_pred.columns: msg = "Setting time column as index" logging.info(msg) desea_pred.set_index("time", inplace=True) # native C.I calculated from individual model predicted[model_name] = desea_pred + \ np.tile( np.tile(seasonality_unit, rep)[:steps], [3, 1] ).transpose() else: # no C.I from individual model tmp_fcst = desea_pred.fcst\ + np.tile(seasonality_unit, rep)[:steps] predicted[model_name] = pd.DataFrame({ "time": desea_pred.index, "fcst": tmp_fcst, "fcst_lower": np.nan, "fcst_upper": np.nan, }).set_index("time") else: # multiplicative, element-wise multiply if ( "fcst_lower" in desea_pred.columns and "fcst_upper" in desea_pred.columns ): # check consistency of time being index if "time" in desea_pred.columns: msg = "Setting time column as index" logging.info(msg) desea_pred.set_index("time", inplace=True) # native C.I calculated from individual model predicted[model_name] = desea_pred * \ np.tile( np.tile(seasonality_unit, rep)[:steps], [3, 1] ).transpose() else: # no C.I from individual model tmp_fcst = desea_pred.fcst * \ np.tile(seasonality_unit, rep)[:steps] predicted[model_name] = pd.DataFrame({ "time": desea_pred.index, "fcst": tmp_fcst, "fcst_lower": 0, "fcst_upper": 0, }).set_index("time") return predicted
[docs] def fitExecutor( self, data : TimeSeriesData, models : EnsembleParams, should_auto_backtest: bool = False, ) -> Tuple[Dict[Any, Any], Optional[Dict[str, float]]]: """callable forecast executor This is native implementation with Python's multiprocessing fit individual model in `models` with given `data`. Services who use KatsEnsemble need to implement their own executor for better performance, if no executor function is given, the native version will be used. Attributes: data: :class:`kats.consts.TimeSeriesData`, given TimeSeriesData, could be original or de-seasonalized models: EnsembleParams object containing model params in BaseModelParams should_auto_backtest: boolean flag for additional back testing runs Returns: Tuple of fitted individual model and weights """ # Fit individual model with given data num_process = min(len(MODELS), (cpu_count() - 1) // 2) if num_process < 1: num_process = 1 # pyre-fixme[16]: `SyncManager` has no attribute `Pool`. pool = multiprocessing.Manager().Pool(processes=(num_process), maxtasksperchild=1000) fitted_models = {} for model in models.models: fitted_models[model.model_name] = pool.apply_async( self._fit_single, args=( data, MODELS[model.model_name.split("_")[0].lower()], model.model_params), ) pool.close() pool.join() fitted = {model: res.get() for model, res in fitted_models.items()} # if auto back testing weights = self.backTestExecutor() if should_auto_backtest else None return fitted, weights
[docs] def fit(self) -> KatsEnsemble: """Fit individual forecasting models via calling fitExecutor This is the fit methdo to fit individual forecasting model """ self.seasonality = KatsEnsemble.seasonality_detector(self.data) # check if self.params["seasonality_length"] is given if self.seasonality and self.params["seasonality_length"] is None: msg = "The given time series contains seasonality,\ a `seasonality_length` must be given in params." raise _logged_error(msg) # set up auto backtesting flag auto_backtesting = False if self.params["aggregation"] == "median" else True # check fitExecutor if "fitExecutor" not in self.params.keys(): fitExecutor = self.fitExecutor if self.seasonality: # STL decomposition sea_data, desea_data = KatsEnsemble.deseasonalize( self.data, self.decomposition_method ) self.sea_data = sea_data self.desea_data = desea_data # we created extra models given_models = copy(self.params["models"].models) for m in self.params["models"].models: if m.model_name.lower() in SMODELS.keys(): tmp = copy(m) tmp.model_name = m.model_name + "_smodel" given_models.append(tmp) self.model_params = model_params = EnsembleParams(given_models) self.fitted, self.weights = fitExecutor( data=desea_data, models=model_params, should_auto_backtest=auto_backtesting, ) else: # fit models on the original data self.model_params = model_params = EnsembleParams(self.params["models"].models) self.fitted, self.weights = fitExecutor( data=self.data, models=model_params, should_auto_backtest=auto_backtesting, ) return self
[docs] def predict(self, steps: int) -> KatsEnsemble: """Predit future for each individual model Args: steps : number of steps ahead to forecast Returns: None """ fitted = self.fitted if fitted is None: raise _logged_error("fit must be called before predict.") self.steps = steps if self.seasonality: sea_data = self.sea_data assert sea_data is not None # we should pred two types of model desea_fitted = {k: v for k, v in fitted.items() if "_smodel" not in k} desea_predict = { k: v.predict(self.steps).set_index("time") for k, v in desea_fitted.items() } # re-seasonalize predicted = KatsEnsemble.reseasonalize( sea_data=sea_data, desea_predict=desea_predict, decomposition_method=self.decomposition_method, seasonality_length=self.params["seasonality_length"], steps=self.steps, ) # add extra model prediction results from smodels fitted_smodel = {k: v for k, v in fitted.items() if "_smodel" in k} extra_predict = { k: v.predict(self.steps).set_index("time") for k, v in fitted_smodel.items() } predicted.update(extra_predict) self.predicted = predicted else: predicted = { k: v.predict(self.steps).set_index("time") for k, v in fitted.items() } # add dummy C.I if the model doesn't have native C.I # this is a hack for median ensemble; everyone model needs to have # its native C.I if user choose weighted average ensemble. for k, v in predicted.items(): # if predicted df doesn't have fcst_lower and fcst_upper if "fcst_lower" not in v.columns or "fcst_upper" not in v.columns: # add dummy C.I tmp_v = copy(v) tmp_v["fcst_lower"] = np.nan tmp_v["fcst_upper"] = np.nan predicted[k] = tmp_v self.predicted = predicted return self
[docs] def forecast(self, steps: int) -> Tuple[Dict[str, pd.DataFrame], Optional[Dict[str, float]]]: """Holistic forecast method in Kats ensemble combine fit and predict methods to produce forecasted results this is especially useful for services which prefer to produce final forecasts without saving the fitted model Args: steps: the length of forecasting horizon Returns: Tuple of predicted values and weights """ self.steps = steps self.seasonality = KatsEnsemble.seasonality_detector(self.data) # check if self.params["seasonality_length"] is given if (self.seasonality) and (self.params["seasonality_length"] is None): msg = "The given time series contains seasonality,\ a `seasonality_length` must be given in params." raise _logged_error(msg) # set up auto backtesting flag auto_backtesting = False if self.params["aggregation"] == "median" else True if self.seasonality: # call forecastExecutor and move to next steps sea_data, desea_data = KatsEnsemble.deseasonalize( self.data, self.decomposition_method ) self.sea_data = sea_data self.desea_data = desea_data # call forecasterExecutor with self.desea_data desea_predict, desea_err = self.forecastExecutor( data=desea_data, models=self.params["models"], steps=steps, should_auto_backtest=auto_backtesting, ) # update the desea_predict with adding seasonality component # re-seasonalize predicted = KatsEnsemble.reseasonalize( sea_data=sea_data, desea_predict=desea_predict, decomposition_method=self.decomposition_method, seasonality_length=self.params["seasonality_length"], steps=self.steps, ) # call forecasterExecutor with self.data # create new models # we created extra models extra_models = [] for m in self.params["models"].models: if m.model_name.lower() in SMODELS.keys(): tmp = copy(m) tmp.model_name = m.model_name + "_smodel" extra_models.append(tmp) model_params = EnsembleParams(extra_models) extra_predict, extra_error = self.forecastExecutor( data=self.data, models=model_params, steps=self.steps, should_auto_backtest=auto_backtesting, ) # combine with predict predicted.update(extra_predict) self.predicted = predicted if self.params["aggregation"] == "weightedavg": if desea_err is None: desea_err = extra_error elif extra_error is not None: desea_err.update(extra_error) self.err = forecast_error = desea_err else: # no seasonality detected predicted, forecast_error = self.forecastExecutor( data=self.data, models=self.params["models"], steps=self.steps, should_auto_backtest=auto_backtesting, ) self.err = forecast_error # same as in predict method above # add dummy C.I if the model doesn't have native C.I # this is a hack for median ensemble; everyone model needs to have # its native C.I if user choose weighted average ensemble. for k, v in predicted.items(): # if predicted df doesn't have fcst_lower and fcst_upper if "fcst_lower" not in v.columns or "fcst_upper" not in v.columns: # add dummy C.I tmp_v = copy(v) tmp_v["fcst_lower"] = np.nan tmp_v["fcst_upper"] = np.nan predicted[k] = tmp_v self.predicted = predicted # we need to transform err to weights if it's weighted avg if self.params["aggregation"] == "weightedavg": assert forecast_error is not None original_weights = { model: 1 / (err + sys.float_info.epsilon) for model, err in forecast_error.items() } self.weights = { model: err / sum(original_weights.values()) for model, err in original_weights.items() } else: self.weights = None return predicted, self.weights
[docs] def forecastExecutor(self, data : TimeSeriesData, models : EnsembleParams, steps: int, should_auto_backtest: bool = False, ) -> Tuple[Dict[str, pd.DataFrame], Optional[Dict[str, float]]]: """Forecast Executor This is a callable execution function to (1). fit model (2). predict with a given steps (3). back testing (optional) Args: data: :class:`kats.consts.TimeSeriesData`, the input time series data as in :class:`kats.consts.TimeSeriesData` models: the ensemble parameters as in `EnsembleParams` steps: the length of forecasting horizon should_auto_backtest: flag to automatically perform back test, default as False Returns: The predicted values from each individual model and weights """ # Fit individual model with given data num_process = min(len(MODELS), (cpu_count() - 1) // 2) if num_process < 1: num_process = 1 # pyre-fixme[16]: `SyncManager` has no attribute `Pool`. pool = multiprocessing.Manager().Pool(processes=(num_process), maxtasksperchild=1000) fitted_models = {} for model in models.models: fitted_models[model.model_name] = pool.apply_async( self._fit_single, args=( data, MODELS[model.model_name.split("_")[0].lower()], model.model_params), ) pool.close() pool.join() fitted = {model: res.get() for model, res in fitted_models.items()} # simply predict with given steps predicted = {} for model_name, model_fitted in fitted.items(): predicted[model_name] = model_fitted.predict(steps).set_index("time") # if auto back testing self.model_params = models # used by _backtester_all if should_auto_backtest: _, errors = self._backtester_all() else: errors = None return predicted, errors
[docs] def aggregate(self) -> pd.DataFrame: """Aggregate the results from predict method Args: None Returns: final results in pd.DataFrame """ predicted = self.predicted if predicted is None: raise _logged_error("predict must be called before aggregate.") # create future dates last_date = self.data.time.max() dates = pd.date_range(start=last_date, periods=self.steps + 1, freq=self.freq) self.dates = dates = dates[dates != last_date] self.fcst_dates = dates.to_pydatetime() # collect the fcst, fcst_lower, and fcst_upper into dataframes fcsts = {} for col in ["fcst", "fcst_lower", "fcst_upper"]: fcsts[col] = pd.concat( [x[col].reset_index(drop=True) for x in predicted.values()], axis=1 ) fcsts[col].columns = predicted.keys() if self.params["aggregation"].lower() == "median": # clean up dataframes with C.I as np.nan or zero fcsts = self.clean_dummy_CI(fcsts, use_zero=False) self.fcst_df = fcst_df = pd.DataFrame({ "time": dates, "fcst": fcsts["fcst"].median(axis=1), "fcst_lower": fcsts["fcst_lower"].median(axis=1), "fcst_upper": fcsts["fcst_upper"].median(axis=1), }) else: if fcsts["fcst_lower"].isnull().values.any() or\ fcsts["fcst_upper"].isnull().values.any(): msg = "Conf. interval contains NaN, please check individual model." raise _logged_error(msg) weights = self.weights assert weights is not None weights = np.array(list(weights.values())) self.fcst_df = fcst_df = pd.DataFrame({ "time": dates, "fcst": fcsts["fcst"].dot(weights), "fcst_lower": fcsts["fcst_lower"].dot(weights), "fcst_upper": fcsts["fcst_upper"].dot(weights), }) logging.debug("Return forecast data: {fcst_df}".format(fcst_df=fcst_df)) return fcst_df
[docs] @staticmethod def clean_dummy_CI( fcsts: Dict[str, pd.DataFrame], use_zero: bool = True, ) -> Dict[str, pd.DataFrame]: """Helper method to clean dummy prediction interval Args: fcsts: the dict of forecasting results from individual models use_zero: flag to use zero to fill nan, default as True Returns: the cleaned results in a dict """ if use_zero: fcsts["fcst_lower"] = fcsts["fcst_lower"].fillna(0) fcsts["fcst_upper"] = fcsts["fcst_upper"].fillna(0) else: fcsts["fcst_lower"] = fcsts["fcst_lower"].replace(0, np.nan) fcsts["fcst_upper"] = fcsts["fcst_upper"].replace(0, np.nan) return fcsts
[docs] def backTestExecutor(self) -> Dict[str, float]: """wrapper for back test executor services which use KatsEnsemble need to write their own backtest wrapper Args: None Returns: The dict of backtesting results """ weights, _ = self._backtester_all() return weights
def _fit_single(self, data: TimeSeriesData, model_func: Callable, model_param: Params ) -> Model: """Private method to fit individual model Args: data: the input time series data model_func: the callable func to fit models model_param: the corresponding model parameter class Returns: Fitted Kats model """ # get the model function call m = model_func(params=model_param, data=data) m.fit() return m def _backtester_single( self, params: Params, model_class, train_percentage: int = 80, test_percentage: int = 20, err_method: str = "mape", ) -> float: """Private method to run single back testing process Args: params: Kats model parameters model_class: Untyped. Defines type of model train_percentage: float. Percentage of data used for training test_percentage: float. Percentage of data used for testing error_method: list of strings indicating which errors to calculate we currently support "mape", "smape", "mae", "mase", "mse", "rmse" Returns: float, the backtesting error """ bt = BackTesterSimple( [err_method], self.data, params, train_percentage, test_percentage, model_class ) bt.run_backtest() return bt.get_error_value(err_method) def _backtester_all( self, err_method: str = "mape", ) -> Tuple[Dict[str, float], Dict[str, float]]: """Private method to run all backtesting process Args: error_method: list of strings indicating which errors to calculate we currently support "mape", "smape", "mae", "mase", "mse", "rmse" Returns: Dict of errors from each model """ model_params = self.model_params if model_params is None: raise _logged_error("fit must be called before backtesting.") num_process = min(len(MODELS.keys()), (cpu_count() - 1) // 2) if num_process < 1: num_process = 1 # pyre-fixme[16]: `SyncManager` has no attribute `Pool`. pool = multiprocessing.Manager().Pool(processes=(num_process), maxtasksperchild=1000) backtesters = {} for model in model_params.models: backtesters[model.model_name] = pool.apply_async( self._backtester_single, args=( model.model_params, MODELS[model.model_name.split("_")[0].lower()] ), kwds={"err_method": err_method}, ) pool.close() pool.join() self.errors = errors = {model: res.get() for model, res in backtesters.items()} original_weights = { model: 1 / (err + sys.float_info.epsilon) for model, err in errors.items() } weights = { model: err / sum(original_weights.values()) for model, err in original_weights.items() } return weights, errors
[docs] def plot(self) -> None: """plot forecast results """ fcst_df = self.fcst_df if fcst_df is None: raise _logged_error("forecast must be called before plot.") logging.info("Generating chart for forecast result from Ensemble model.") mm.Model.plot(self.data, fcst_df)