Source code for kats.models.bayesian_var

#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""
 Bayesian estimation of Vector Autoregressive Model using
 Minnesota prior on the coefficient matrix. This version is
 useful for regularization when they are too many coefficients
 to be estimated.

 Implementation inspired by the following two articles/papers:
    https://www.mathworks.com/help/econ/normalbvarm.html#mw_4a1ab118-9ef3-4380-8c5a-12b848254117
    http://apps.eui.eu/Personal/Canova/Articles/ch10.pdf (page 5)
"""

import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Tuple

import kats.models.model as m
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from kats.consts import Params, TimeSeriesData
from numpy.linalg import inv  # @manual
from scipy.linalg import block_diag  # @manual


[docs]@dataclass class BayesianVARParams(Params): """Parameter class for Bayesian VAR model Attributes: p: Historical lag to use Below parameters are hyperparameters in the covariance matrix for coefficient prior. See page 5 in http://apps.eui.eu/Personal/Canova/Articles/ch10.pdf for more details. phi_0: tightness on the variance of the first lag phi_1: relative tightness of other variables phi_2: relative tightness of the exogenous variables phi_3: decay with lag is parameterized as lag^phi_3 """ p: int = 5 phi_0: float = 0.02 phi_1: float = 0.25 phi_2: float = 20 phi_3: float = 3 def validate_params(self) -> None: assert self.p > 0, f"Lag order should be positive, but got {self.p}" assert self.phi_0 > 0 assert self.phi_1 > 0 assert self.phi_2 > 0 assert self.phi_3 > 0 assert self.phi_1 <= 1, ( "Parameter phi_1 should generally be <= 1. " "See page 5 of http://apps.eui.eu/Personal/Canova/Articles/ch10.pdf" "for more details." )
[docs]class BayesianVAR(m.Model): """ Model class for bayesian VAR This class provides fit, predict, and plot methods for bayesian VAR model Attributes: data: the input time series data as `TimeSeriesData` params: the parameter class defined with `BayesianVARParams` """ def __init__(self, data: TimeSeriesData, params: BayesianVARParams) -> None: # Ignore the input time column and re-index to 0...T copy_data = data.to_dataframe() self.start_date = copy_data.time[0] if isinstance(self.start_date, str): self.start_date = datetime.strptime(self.start_date, "%Y-%m-%d") # pyre-fixme[16]: `DataFrame` has no attribute `time`. copy_data.time = pd.RangeIndex(0, len(copy_data)) copy_data = TimeSeriesData(copy_data) self.time_freq = BayesianVAR._check_get_freq( copy_data ) # check for consistent frequency self.data = copy_data self.X, self.Y = BayesianVAR._convert_timeseries_np(copy_data) assert ( self.X.shape[1] == self.Y.shape[1] ), "Expected same amount of data on time axis for X and Y" self.m, self.T = self.Y.shape self.r = self.X.shape[0] self.p = params.p self.phi_0 = params.phi_0 self.phi_1 = params.phi_1 self.phi_2 = params.phi_2 self.phi_3 = params.phi_3 self.N = (self.m * self.p) + self.r + 1 self.num_mu_coefficients = self.m * self.N self.fitted = False logging.info( "Initializing Bayesian VAR model with: " f"BVAR(p={self.p}, m={self.m}, r={self.r}, T={self.T}, N={self.N}, " f"phi_0={self.phi_0}, phi_1={self.phi_1}, " f"phi_2={self.phi_2}, phi_3={self.phi_3})" ) @staticmethod def _check_get_freq(data) -> None: time_diff = data.time.diff().dropna() diff_unique = time_diff.unique() assert len(diff_unique) == 1, ( f"Frequency of metrics is not constant: {diff_unique} " "Please check for missing or duplicate values" ) return diff_unique.item() @staticmethod def _convert_timeseries_np( timeseries: TimeSeriesData, ) -> Tuple[np.ndarray, np.ndarray]: data_df = timeseries.to_dataframe() Y = data_df.drop(columns=["time"]).to_numpy().T m, T = Y.shape X = np.expand_dims(pd.RangeIndex(0, len(timeseries)), axis=0) return X, Y def _get_training_residuals(self): times = [] residuals = [] logging.info( f"Performing one-step ahead forecasting on history on from t={self.p} to t={self.T-1}." ) # create dataframe with each column corresponding to the residual for t in range(self.p, self.T): point_pred = self._evaluate_point_t(self.X, self.Y, t) time = self.X[:, t].item() times.append(time) residuals.append(self.Y[:, t]-point_pred) times_new = [self.start_date + timedelta(days=x) for x in times] df_resid = pd.DataFrame(residuals, index=times_new, columns=self.data.value.columns) return df_resid
[docs] def fit(self) -> None: """Fit Bayesian VAR model""" # pyre-fixme[16]: `BayesianVAR` has no attribute `sigma_ols`. self.sigma_ols = self._compute_sigma_ols() mu_prior = np.zeros((self.m, self.N)) for i in range(self.m): mu_prior[i, self.p * i] = 1 mu_prior = mu_prior.flatten() v_prior = self._construct_v_prior() Z_sig_Z_sum = 0 Z_sig_y_sum = 0 for t in range(self.p, self.T): Z_t = self._construct_Zt( self.X, self.Y, t ) # shape: m x [m * (m * p + r + 1)] z_sum_term = ( Z_t.T @ inv(self.sigma_ols) ) @ Z_t # shape: [m * (m * p + r + 1)] x [m * (m * p + r + 1)] y_sum_term = (Z_t.T @ inv(self.sigma_ols)) @ self.Y[ :, t ] # shape: [m * (m * p + r + 1)] x 1 assert ( self.num_mu_coefficients, self.num_mu_coefficients, ) == z_sum_term.shape, f"Expected {(self.num_mu_coefficients, self.num_mu_coefficients)}, got {z_sum_term.shape}" assert ( self.num_mu_coefficients, ) == y_sum_term.shape, ( f"Expected {(self.num_mu_coefficients,)}, got {y_sum_term.shape}" ) Z_sig_Z_sum += z_sum_term Z_sig_y_sum += y_sum_term # pyre-fixme[16]: `BayesianVAR` has no attribute `v_posterior`. self.v_posterior = inv( inv(v_prior) + Z_sig_Z_sum ) # shape: [m * (m * p + r + 1)] x [m * (m * p + r + 1)] assert ( self.num_mu_coefficients, self.num_mu_coefficients, ) == self.v_posterior.shape, f"Expected {(self.num_mu_coefficients, self.num_mu_coefficients)}, got {self.v_posterior.shape}" # pyre-fixme[16]: `BayesianVAR` has no attribute `mu_posterior`. self.mu_posterior = self.v_posterior @ ( inv(v_prior) @ mu_prior + Z_sig_y_sum ) # shape: [m * (m * p + r + 1)] x 1 assert ( self.num_mu_coefficients, ) == self.mu_posterior.shape, ( f"Expected {(self.num_mu_coefficients,)}, got {self.mu_posterior.shape}" ) # pyre-fixme[16]: `BayesianVAR` has no attribute `resid`. self.resid = self._get_training_residuals() self.fitted = True
def _construct_z(self, X, Y, t: int) -> np.ndarray: assert t >= self.p, f"Need t={t} >= p={self.p}." assert self.r == X.shape[0] assert self.m == Y.shape[0] new_yt = np.fliplr(Y[:, t - self.p : t]).flatten() z = np.concatenate( [new_yt, X[:, t].T, np.array([1])], axis=0 ) # shape: [(m * p + r + 1) x 1] assert (self.N,) == z.shape, f"Expected {(self.N,)} but got {z.shape}" return z def _construct_Zt(self, X, Y, t: int) -> np.ndarray: z = self._construct_z(X, Y, t) Z_t = block_diag(*([z] * self.m)) assert ( self.m, self.num_mu_coefficients, ) == Z_t.shape, ( f"Expected {(self.m, self.num_mu_coefficients)}, got {Z_t.shape}" ) return Z_t # shape: m x [m * (m * p + m + 1)] def _construct_X_OLS(self) -> np.ndarray: X_OLS = np.zeros((self.N, self.T - self.p)) for t in range(self.p, self.T): X_OLS[:, t - self.p] = self._construct_z( self.X, self.Y, t ) # X_OLS ignores first p values return X_OLS def _compute_sigma_ols(self) -> np.ndarray: Y_suffix = self.Y[:, self.p :] X_OLS = self._construct_X_OLS() beta_ols = (Y_suffix @ X_OLS.T) @ inv(X_OLS @ X_OLS.T) sse = (Y_suffix - beta_ols @ X_OLS) @ ( Y_suffix - beta_ols @ X_OLS ).T # should produce [m x m] matrix assert ( self.m, self.m, ) == sse.shape, f"Expected {(self.m, self.m)}, but got {sse.shape}" assert self.T > (self.m * self.p) + 1 return sse / float(self.T - (self.m * self.p) - 1) def _sigma_ijl(self, i, j, lag, variance, is_exogenous) -> float: """ Taken from page 5 of http://apps.eui.eu/Personal/Canova/Articles/ch10.pdf """ def h(x): return x ** self.phi_3 if i == j: return self.phi_0 / h(lag) elif is_exogenous: return self.phi_0 * self.phi_2 else: # endogenous variable j return self.phi_0 * (self.phi_1 / h(lag)) * (variance[j] / variance[i]) def _construct_v_prior(self) -> np.ndarray: cov = np.zeros((self.num_mu_coefficients, self.num_mu_coefficients)) variance = np.var(self.Y, axis=1) element_ind = 0 for i in range(self.m): for j in range(self.m): # iterate through the m classes of lagged variables for lag in range(1, self.p + 1): # iterate through the lags cov[element_ind][element_ind] = self._sigma_ijl( i, j, lag, variance, is_exogenous=False ) element_ind += 1 for _ex in range(self.r): # exogenous variables cov[element_ind][element_ind] = self._sigma_ijl( i, None, None, variance, is_exogenous=True ) element_ind += 1 # constant term of 1 cov[element_ind][element_ind] = self._sigma_ijl( i, None, None, variance, is_exogenous=True ) element_ind += 1 assert ( element_ind == self.num_mu_coefficients ), f"Final element: {element_ind}, expected: {self.num_mu_coefficients}" return cov # shape: [m * (m * p + r + 1)] x [m * (m * p + r + 1)] matrix def _evaluate_point_t(self, X_new, Y_new, t) -> np.ndarray: assert t >= self.p, f"Need t={t} > p={self.p}." Z_t = self._construct_Zt(X_new, Y_new, t) # pyre-fixme[16]: `BayesianVAR` has no attribute `mu_posterior`. point_prediction = Z_t @ self.mu_posterior # shape [m x 1] assert (self.m,) == point_prediction.shape return point_prediction def _look_ahead_step( self, X_ahead, Y_curr ) -> np.ndarray: # Y_curr has one less element than X_ahead assert Y_curr.shape[1] + 1 == X_ahead.shape[1] t_ahead = X_ahead.shape[1] - 1 # -1 for 0-indexed array Z_t = self._construct_Zt(X_ahead, Y_curr, t_ahead) # pyre-fixme[16]: `BayesianVAR` has no attribute `mu_posterior`. look_ahead_pred = Z_t @ self.mu_posterior # shape [m x 1] assert (self.m,) == look_ahead_pred.shape return look_ahead_pred # pyre-fixme[14]: `predict` overrides method defined in `Model` inconsistently.
[docs] def predict( self, steps: int, include_history=False, verbose=False ) -> Dict[str, TimeSeriesData]: """Predict with the fitted VAR model Args: steps: Number of time steps to forecast include_history: return fitted values also Returns: Disctionary of predicted results for each metric. Each metric result has following columns: `time`, `fcst`, `fcst_lower`, and `fcst_upper` Note confidence intervals of forecast are not yet implemented. """ assert self.fitted, "Please fit the model before forecasting with .fit()" times = [] forecast_vals = [] if include_history: logging.info( f"Performing one-step ahead forecasting on history on from t={self.p} to t={self.T-1}." ) for t in range(self.p, self.T): point_pred = self._evaluate_point_t(self.X, self.Y, t) time = self.X[:, t].item() if verbose: logging.info( f"Performing one-step ahead forecasting with history on t={time}." ) times.append(time) forecast_vals.append(point_pred) # future forecasting -- X_ahead is one time step ahead of Y_curr X_ahead = self.X Y_curr = self.Y logging.info( f"Performing future forecasting from t={self.T} to t={self.T+steps-1}." ) for _t in range(self.T, self.T + steps): ahead_time = X_ahead[np.newaxis, :, -1] + self.time_freq X_ahead = np.concatenate([X_ahead, ahead_time], axis=1) look_ahead_pred = self._look_ahead_step(X_ahead, Y_curr) time = ahead_time.item() if verbose: logging.info(f"Performing future forecasting with t={time}.") times.append(time) forecast_vals.append(look_ahead_pred) Y_curr = np.concatenate([Y_curr, look_ahead_pred[:, np.newaxis]], axis=1) assert ( len(times) > 0 ), "Forecast produced no values. Please set steps > 0 or include_history=True." indiv_forecasts = {} forecast_length = len(times) logging.warning( "Upper and lower confidence intervals of forecast not yet implemented for Bayesian VAR model." ) times_new = [self.start_date + timedelta(days=x) for x in times] for i, c in enumerate(self.data.value.columns): c_forecast = pd.DataFrame( { "time": times_new, "fcst": [forecast_vals[f_t][i] for f_t in range(forecast_length)], "fcst_lower": [-1] * forecast_length, "fcst_upper": [-1] * forecast_length, } ) indiv_forecasts[c] = TimeSeriesData(c_forecast) # pyre-fixme[16]: `BayesianVAR` has no attribute `forecast`. self.forecast = indiv_forecasts # pyre-fixme[16]: `BayesianVAR` has no attribute `forecast_max_time`. self.forecast_max_time = max(times_new) return self.forecast
# pyre-fixme[14]: `plot` overrides method defined in `Model` inconsistently. # pyre-fixme[40]: Non-static method `plot` cannot override a static method # defined in `m.Model`.
[docs] def plot(self) -> None: """Plot forecasted results from Bayesian VAR model""" assert hasattr(self, "forecast"), "Please run .predict() before plotting" plt.figure(figsize=(20, 6)) plt.title("Input Timeseries & Forecast") for i, c in enumerate(self.data.value.columns): color = f"C{i}" plt.plot(self.data.time, self.data.value[c], c=color) # pyre-fixme[16]: `BayesianVAR` has no attribute `forecast`. plt.plot(self.forecast[c].time, self.forecast[c].value, "--", c=color)
@property def sigma_u(self) -> pd.DataFrame: return pd.DataFrame( # pyre-fixme[16]: `BayesianVAR` has no attribute `sigma_ols`. self.sigma_ols, index=self.data.value.columns, columns=self.data.value.columns, ) @property def k_ar(self) -> int: return self.p