#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Module with generic outlier detection models. Supports a univariate algorithm that
treates each metric separately to identify outliers and a multivariate detection
algorithm that determines outliers based on joint distribution of metrics
"""
import datetime as dt
from importlib import import_module
import logging
import sys
import traceback
from enum import Enum
from typing import Any, Dict, List, Optional
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import pandas as pd
from kats.consts import Params, TimeSeriesData, TimeSeriesIterator
from kats.detectors.detector import Detector
from scipy import stats
from scipy.spatial import distance
from statsmodels.tsa.seasonal import seasonal_decompose
[docs]class OutlierDetector(Detector):
"""
This univariate outlier detection algorithm mimics the outlier
detection algorithm in R
Attributes:
data: TimeSeriesData object with the time series
decomp : 'additive' or 'multiplicative' decomposition
iqr_mult : iqr_mult * inter quartile range is used to classify outliers
"""
outliers_index: Optional[List] = None
def __init__(
self, data: TimeSeriesData, decomp: str = "additive", iqr_mult: float = 3.0
) -> None:
super().__init__(data)
if decomp in ["additive", "multiplicative"]:
self.decomp = decomp
else:
logging.info("Invalid decomposition setting specified")
logging.info("Defaulting to Additive Decomposition")
self.decomp = "additive"
self.iqr_mult = iqr_mult
def __clean_ts__(self, original: pd.DataFrame) -> List:
"""
Performs detection for a single metric. First decomposes the time series
and detects outliers when the values in residual time series are beyond the
specified multiplier times the inter quartile range
Args:
original: original time series as DataFrame
Returns: List of detected outlier timepoints in each metric
"""
# pyre-fixme[16]: `DataFrame` has no attribute `index`.
original.index = pd.to_datetime(original.index)
if pd.infer_freq(original.index) is None:
# pyre-fixme[9]: original has type `DataFrame`; used as
# `Union[pd.core.frame.DataFrame, pd.core.series.Series]`.
original = original.asfreq("D")
logging.info("Setting frequency to Daily since it cannot be inferred")
# pyre-fixme[9]: original has type `DataFrame`; used as `Union[None,
# pd.core.frame.DataFrame, pd.core.series.Series]`.
original = original.interpolate(
method="polynomial", limit_direction="both", order=3
)
# This is a hack since polynomial interpolation is not working here
if sum((np.isnan(x) for x in original["y"])):
# pyre-fixme[9]: original has type `DataFrame`; used as `Union[None,
# pd.core.frame.DataFrame, pd.core.series.Series]`.
original = original.interpolate(method="linear", limit_direction="both")
# Once our own decomposition is ready, we can directly use it here
result = seasonal_decompose(original, model=self.decomp)
rem = result.resid
detrend = original["y"] - result.trend
strength = float(1 - np.nanvar(rem) / np.nanvar(detrend))
if strength >= 0.6:
original["y"] = original["y"] - result.seasonal
# using IQR as threshold
resid = original["y"] - result.trend
resid_q = np.nanpercentile(resid, [25, 75])
iqr = resid_q[1] - resid_q[0]
limits = resid_q + (self.iqr_mult * iqr * np.array([-1, 1]))
outliers = resid[(resid >= limits[1]) | (resid <= limits[0])]
self.outliers_index = outliers_index = list(outliers.index)
return outliers_index
[docs] def detector(self):
"""
Detects outliers and stores in self.outliers
"""
self.iter = TimeSeriesIterator(self.data)
self.outliers = []
logging.Logger("Detecting Outliers")
for ts in self.iter:
try:
outlier = self.__clean_ts__(ts)
self.outliers.append(outlier)
except BaseException:
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
logging.error("".join("!! " + line for line in lines))
logging.error("Outlier Detection Failed")
self.outliers.append([])
[docs]class MultivariateAnomalyDetectorType(Enum):
VAR = 'var.VARModel'
BAYESIAN_VAR = 'bayesian_var.BayesianVAR'
[docs]class MultivariateAnomalyDetector(Detector):
"""
Detector class for Multivariate Outlier Detection.
Provides utilities to calculate anomaly scores, get anomalous
datapoints and anomalous metrics at those points.
Attributes:
data: Input metrics TimeSeriesData
params: Parameter class for multivariate VAR/ BVAR model
training_days: num of days of data to use for initial training.
If less than a day, specify as fraction of a day
model_type: The type of multivariate anomaly detector (currently 'BAYESIAN_VAR' and 'VAR' options available)
"""
resid: Optional[pd.DataFrame] = None
sigma_u: Optional[pd.DataFrame] = None
anomaly_score_df: Optional[pd.DataFrame] = None
def __init__(
self,
data: TimeSeriesData,
params: Params,
training_days: float,
model_type: MultivariateAnomalyDetectorType = MultivariateAnomalyDetectorType.VAR,
) -> None:
super().__init__(data)
df = data.to_dataframe().set_index("time")
self.df = df
params.validate_params()
self.params = params
# pyre-fixme[16]: `Optional` has no attribute `diff`.
time_diff = data.time.sort_values().diff().dropna()
if len(time_diff.unique()) == 1: # check constant frequenccy
freq = time_diff.unique()[0].astype("int")
self.granularity_days = freq / (24 * 3600 * (10 ** 9))
else:
raise RuntimeError(
"Frequency of metrics is not constant."
"Please check for missing or duplicate values"
)
self.training_days = training_days
self.detector_model = model_type
def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Remove outliers in training data based on zscore
Args:
df: input dataframe
Returns: Clean dataframe
"""
z_score_threshold = 3
# pyre-fixme[16]: Module `stats` has no attribute `zscore`.
zscore_df = stats.zscore(df)
non_outlier_flag = zscore_df < z_score_threshold
df_clean = df.where(non_outlier_flag, np.nan)
df_clean = df_clean.interpolate(
method="linear", order=2, limit_direction="both"
)
return df_clean
def _is_pos_def(self, mat: np.ndarray) -> bool:
"""
Check if matrix is positive definite.
Args:
mat: numpy matrix
Returns:
True if mat is positive definite
"""
return np.all(np.linalg.eigvals(mat) > 0)
def _create_model(self, data: TimeSeriesData, params: Params) -> Any:
model_name = f"kats.models.{self.detector_model.value}"
module_name, model_name = model_name.rsplit(".", 1)
return getattr(import_module(module_name), model_name)(data, params)
def _generate_forecast(self, t: datetime) -> pd.DataFrame:
"""
Fit VAR model and generate 1 step ahead forecasts (t+1)
Args:
t: time until which to use for training the VAR model
Returns:
DataFrame with predicted expected value for each metric value at (t+1)
"""
logging.info(f"Generating forecasts at {t}")
look_back = dt.timedelta(days=self.training_days)
train = self.df.loc[t - look_back : t, :]
train_clean = self._clean_data(train)
# fit VAR
model = self._create_model(
TimeSeriesData(train_clean.reset_index()), self.params
)
model.fit()
lag_order = model.k_ar
logging.info(f"Fitted VAR model of order {lag_order}")
self.resid = model.resid
self.sigma_u = sigma_u = model.sigma_u
if ~(self._is_pos_def(sigma_u)):
msg = f"Fitted Covariance matrix at time {t} is not positive definite"
logging.error(msg)
raise RuntimeError(msg)
# predict
pred = model.predict(steps=1)
forecast = [[k, float(pred[k].value["fcst"])] for k, v in pred.items()]
pred_df = pd.DataFrame(columns=["index", "est"], data=forecast).set_index(
"index"
)
test = self.df.loc[t + dt.timedelta(days=self.granularity_days), :]
pred_df["actual"] = test
return pred_df
def _calc_anomaly_scores(self, pred_df: pd.DataFrame) -> Dict[str, float]:
"""
Calculate overall anomay score at time t based on multivariate Mahalonabis distance
and individual anomaly scores as zscores
Args:
pred_df: Dataframe with forecasted values
Returns:
Dictionary with overall and individual anomaly scores along with p-value
"""
# individual anomaly scores
cov = self.sigma_u
resid = self.resid
assert cov is not None and resid is not None
residual_score = {}
rt = pred_df["est"] - pred_df["actual"]
for col in cov.columns:
residual_mean = resid[col].mean()
residual_var = resid[col].var()
residual_score[col] = np.abs((rt[col] - residual_mean)) / np.sqrt(residual_var)
# overall anomaly score
cov_inv = np.linalg.inv(cov.values)
overall_anomaly_score = distance.mahalanobis(
rt.values, resid.mean().values, cov_inv
)
residual_score["overall_anomaly_score"] = overall_anomaly_score
# calculate p-values
dof = len(self.df.columns)
# pyre-ignore[16]: Module `stats` has no attribute `chi2`.
residual_score['p_value']= stats.chi2.sf(overall_anomaly_score, df=dof)
return residual_score
# pyre-fixme[14]: `detector` overrides method defined in `Detector` inconsistently.
[docs] def detector(self) -> pd.DataFrame:
"""
Fit the detection model and return the results
Returns:
DataFrame with colums corresponding to individual anomaly scores
of each metric and the overall anomaly score for the whole timeseries
"""
anomaly_score_df = pd.DataFrame()
look_back = dt.timedelta(days=self.training_days)
fcstTime = self.df.index.min() + look_back
while fcstTime < self.df.index.max():
# forecast for fcstTime+ 1
pred_df = self._generate_forecast(fcstTime)
# calculate anomaly scores
anomaly_scores_t = self._calc_anomaly_scores(pred_df)
# process next observation
fcstTime += dt.timedelta(days=self.granularity_days)
anomaly_scores_t = pd.DataFrame(anomaly_scores_t, index=[fcstTime])
anomaly_score_df = anomaly_score_df.append(anomaly_scores_t)
self.anomaly_score_df = anomaly_score_df
return anomaly_score_df
[docs] def get_anomaly_timepoints(self, alpha: float) -> List:
"""
Helper function to get anomaly timepoints based on the significance level
Args:
alpha: significance level to consider the timeperiod anomalous
Use .plot() to help choose a good threshold
Returns:
List of time instants when the system of metrics show anomalous behavior
"""
anomaly_score_df = self.anomaly_score_df
if anomaly_score_df is None:
raise ValueError("detector() must be called before get_anomaly_timepoints()")
flag = anomaly_score_df.p_value < alpha
anomaly_ts = anomaly_score_df[flag].index
return list(anomaly_ts)
[docs] def get_anomalous_metrics(self, t: datetime, top_k: Optional[int] = None) -> pd.DataFrame:
"""
Find top k metrics with most anomalous behavior at time t
Args:
t: time instant of interest (same type as TimeSeriesData.time)
top_k: Top few metrics to return. If None, returns all
Returns:
DataFrame with metrics and their corresponding anomaly score
"""
anomaly_score_df = self.anomaly_score_df
if anomaly_score_df is None:
raise ValueError("detector() must be called before get_anomalous_metrics()")
residual_scores = anomaly_score_df.drop(columns="overall_anomaly_score")
residual_scores_t = (
residual_scores.loc[t, :].sort_values(ascending=False).reset_index()
)
residual_scores_t.columns = ["metric", "anomaly_score"]
return residual_scores_t[:top_k]
[docs] def plot(self):
"""
Plot overall anomaly score of system of metrics at each instant.
Useful for threshold selection
"""
f, ax = plt.subplots(2, 1, sharex=True, figsize=(15, 8 * 2))
a = pd.merge(
self.df,
self.anomaly_score_df["overall_anomaly_score"],
left_index=True,
right_index=True,
how="right",
)
a.drop(columns=["overall_anomaly_score"]).plot(ax=ax[0])
ax[0].set_title("Input time series metrics")
a["overall_anomaly_score"].plot(legend=False, ax=ax[1])
ax[1].set_title("Overall Anomaly Score")