#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple, Union
import attr
import numpy as np
import pandas as pd
from kats.consts import TimeSeriesData
from scipy.stats import norm, t, ttest_ind # @manual
from statsmodels.stats import multitest
# from np.typing import ArrayLike
ArrayLike = np.ndarray
# Single Spike object
@attr.s(auto_attribs=True)
class SingleSpike:
time: datetime
value: float
n_sigma: float
@property
def time_str(self) -> str:
return datetime.strftime(self.time, "%Y-%m-%d")
# Changepoint Interval object
@attr.s(auto_attribs=True)
class ChangePointInterval:
start_time: datetime
end_time: datetime
previous_interval: Optional[ChangePointInterval] = attr.ib(default=None, init=False)
_all_spikes: Union[
Optional[List[SingleSpike]], Optional[List[List[SingleSpike]]]
] = attr.ib(default=None, init=False)
spike_std_threshold: float = attr.ib(default=2.0, init=False)
data_df: Optional[pd.DataFrame] = attr.ib(None, init=False)
_ts_cols: List[str] = attr.ib(factory=lambda: ["value"], init=False)
num_series: int = 1
@property
def data(self) -> Optional[ArrayLike]:
df = self.data_df
if df is None:
return None
elif self.num_series == 1:
return df.value.values
else:
return df[self._ts_cols].values
@data.setter
def data(self, data: TimeSeriesData) -> None:
if not data.is_univariate():
self._ts_cols = list(data.value.columns)
self.num_series = len(self._ts_cols)
all_data_df = data.to_dataframe()
all_data_df.columns = ["time"] + self._ts_cols
all_data_df["time"] = pd.to_datetime(all_data_df["time"])
all_data_df = all_data_df.loc[
(all_data_df.time >= self.start_time) & (all_data_df.time < self.end_time)
]
self.data_df = all_data_df
def _detect_spikes(self) -> Union[List[SingleSpike], List[List[SingleSpike]]]:
df = self.data_df
if df is None:
raise ValueError("data must be set before spike detection")
if self.num_series == 1:
df["z_score"] = (df.value - self.mean_val) / np.sqrt(self.variance_val)
spike_df = df.query(f"z_score >={self.spike_std_threshold}")
return [
SingleSpike(
time=row["time"], value=row["value"], n_sigma=row["z_score"]
)
for counter, row in spike_df.iterrows()
]
else:
spikes = []
for i, c in enumerate(self._ts_cols):
mean_val, variance_val = self.mean_val, self.variance_val
if isinstance(mean_val, float) or isinstance(variance_val, float):
raise ValueError(
f"num_series = {self.num_series} so mean_val and variance_val should have type ArrayLike."
)
df[f"z_score_{c}"] = (df[c] - mean_val[i]) / np.sqrt(variance_val[i])
spike_df = df.query(f"z_score_{c} >={self.spike_std_threshold}")
if spike_df.shape[0] == 0:
continue
else:
spikes.append(
[
SingleSpike(
time=row["time"],
value=row[c],
n_sigma=row[f"z_score_{c}"],
)
for counter, row in spike_df.iterrows()
]
)
return spikes
def extend_data(self, data: TimeSeriesData) -> None:
"""
extends the data.
"""
new_data_df = data.to_dataframe()
new_data_df.columns = ["time"] + self._ts_cols
df = self.data_df
if df is not None:
new_data_df = pd.concat([df, new_data_df])
self.data_df = new_data_df.loc[
(new_data_df.time >= self.start_time) & (new_data_df.time < self.end_time)
]
@property
def start_time_str(self) -> str:
return datetime.strftime(self.start_time, "%Y-%m-%d")
@property
def end_time_str(self) -> str:
return datetime.strftime(self.end_time, "%Y-%m-%d")
@property
def mean_val(self) -> Union[float, ArrayLike]:
if self.num_series == 1:
vals = self.data
return 0.0 if vals is None else np.mean(vals)
else:
data_df = self.data_df
if data_df is None:
return np.zeros(self.num_series)
return np.array([np.mean(data_df[c].values) for c in self._ts_cols])
@property
def variance_val(self) -> Union[float, ArrayLike]:
if self.num_series == 1:
vals = self.data
return 0.0 if vals is None else np.var(vals)
else:
data_df = self.data_df
if data_df is None:
return np.zeros(self.num_series)
return np.array([np.var(data_df[c].values) for c in self._ts_cols])
def __len__(self) -> int:
df = self.data_df
return 0 if df is None else len(df)
@property
def spikes(self) -> Union[List[SingleSpike], List[List[SingleSpike]]]:
spikes = self._all_spikes
if spikes is None:
spikes = self._detect_spikes()
self._all_spikes = spikes
return spikes
# Percentage Change Object
class PercentageChange:
def __init__(
self,
current: ChangePointInterval,
previous: ChangePointInterval,
method="fdr_bh",
):
self.current = current
self.previous = previous
self.upper = None
self.lower = None
self._t_score = None
self._p_value = None
self.alpha = 0.05
self.method = method
self.num_series = self.current.num_series
@property
def ratio_estimate(self) -> Union[float, np.ndarray]:
# pyre-ignore[6]: Expected float for 1st positional only parameter to call float.__truediv__ but got Union[float, np.ndarray].
return self.current.mean_val / self.previous.mean_val
@property
def perc_change(self) -> float:
return (self.ratio_estimate - 1.0) * 100.0
@property
def perc_change_upper(self) -> float:
if self.upper is None:
self._delta_method()
return (self.upper - 1) * 100.0
@property
def perc_change_lower(self) -> float:
if self.lower is None:
self._delta_method()
return (self.lower - 1) * 100.0
@property
def direction(self) -> Union[str, ArrayLike]:
if self.num_series > 1:
return np.vectorize(lambda x: "up" if x > 0 else "down")(self.perc_change)
elif self.perc_change > 0.0:
return "up"
else:
return "down"
@property
def stat_sig(self) -> Union[bool, ArrayLike]:
if self.upper is None:
self._delta_method()
if self.num_series > 1:
return np.array(
[
False if self.upper[i] > 1.0 and self.lower[i] < 1 else True
for i in range(self.current.num_series)
]
)
# not stat sig e.g. [0.88, 1.55]
return not (self.upper > 1.0 and self.lower < 1.0)
@property
def score(self) -> float:
if self._t_score is None:
self._ttest()
return self._t_score
@property
def p_value(self) -> float:
if self._p_value is None:
self._ttest()
return self._p_value
@property
def mean_previous(self) -> Union[float, np.ndarray]:
return self.previous.mean_val
@property
def mean_difference(self) -> Union[float, np.ndarray]:
# pyre-ignore[6]: Expected `float` for 1st param but got `Union[float,
# np.ndarray]`.
_mean_diff = self.current.mean_val - self.previous.mean_val
return _mean_diff
@property
def ci_upper(self) -> float:
sp_mean = self._pooled_stddev()
df = self._get_df()
# the minus sign here is non intuitive.
# this is because, for example, t.ppf(0.025, 30) ~ -1.96
_ci_upper = self.previous.mean_val - t.ppf(self.alpha / 2, df) * sp_mean
return _ci_upper
@property
def ci_lower(self) -> float:
sp_mean = self._pooled_stddev()
df = self._get_df()
# the plus sign here is non-intuitive. See comment
# above
_ci_lower = self.previous.mean_val + t.ppf(self.alpha / 2, df) * sp_mean
return _ci_lower
def _get_df(self) -> float:
"""
degree of freedom of t-test
"""
n_1 = len(self.previous)
n_2 = len(self.current)
df = n_1 + n_2 - 2
return df
def _pooled_stddev(self) -> float:
"""
This calculates the pooled standard deviation for t-test
as defined in https://online.stat.psu.edu/stat500/lesson/7/7.3/7.3.1/7.3.1.1
"""
s_1_sq = self.previous.variance_val
s_2_sq = self.current.variance_val
n_1 = len(self.previous)
n_2 = len(self.current)
if n_1 == 0 or n_2 == 0:
return 0.0
# pyre-ignore[58]: * is not supported for operand types int and Union[float, np.ndarray].
s_p = np.sqrt(((n_1 - 1) * s_1_sq + (n_2 - 1) * s_2_sq) / (n_1 + n_2 - 2))
# s_p_mean = s_p * np.sqrt((1. / n_1) + (1./ n_2))
return s_p
def _ttest_manual(self) -> Tuple[float, float]:
"""
scipy's t-test gives nan when one of the arrays has a
size of 1.
To repro, run:
>>> ttest_ind(np.array([1,2,3,4]), np.array([11]), equal_var=True, nan_policy='omit')
This is implemented to fix this issue
"""
sp_mean = self._pooled_stddev()
df = self._get_df()
# pyre-ignore[6]: Expected float for 1st positional only parameter to call float.__sub__ but got Union[float, np.ndarray].
t_score = (self.current.mean_val - self.previous.mean_val) / sp_mean
p_value = t.sf(np.abs(t_score), df) * 2 # sf = 1 - cdf
return t_score, p_value
def _ttest(self) -> None:
if self.num_series > 1:
self._ttest_multivariate()
return
n_1 = len(self.previous)
n_2 = len(self.current)
# if both control and test have one value
# then using a t test does not make any sense
if n_1 == 1 and n_2 == 1:
self._t_score = np.inf
self._p_value = 0.0
# when sample size is 1, scipy's t test gives nan,
# hence we separately handle this case
# if n_1 == 1 or n_2 == 1:
# self._t_score, self._p_value = self._ttest_manual()
# else:
# self._t_score, self._p_value = ttest_ind(
# current_data, prev_data, equal_var=True, nan_policy='omit'
# )
# Always use ttest_manual because we changed the std to not include
# np.sqrt((1. / n_1) + (1./ n_2))
self._t_score, self._p_value = self._ttest_manual()
def _ttest_multivariate(self) -> None:
num_series = self.num_series
p_value_start = np.zeros(num_series)
t_value_start = np.zeros(num_series)
n_1 = len(self.previous)
n_2 = len(self.current)
if n_1 == 1 and n_2 == 1:
self._t_score = np.inf * np.ones(num_series)
self._p_value = np.zeros(num_series)
return
elif n_1 == 1 or n_2 == 1:
t_value_start, p_value_start = self._ttest_manual()
else:
current_data = self.current.data
prev_data = self.previous.data
if current_data is None or prev_data is None:
raise ValueError("Interval data not set")
for i in range(num_series):
current_slice = current_data[:, i]
prev_slice = prev_data[:, i]
t_value_start[i], p_value_start[i] = ttest_ind(
current_slice, prev_slice, equal_var=True, nan_policy="omit"
)
# The new p-values are the old p-values rescaled so that self.alpha is still the threshold for rejection
_, self._p_value, _, _ = multitest.multipletests(
p_value_start, alpha=self.alpha, method=self.method
)
self._t_score = np.zeros(num_series)
# We are using a two-sided test here, so we take inverse_tcdf(self._p_value / 2) with df = len(self.current) + len(self.previous) - 2
for i in range(self.current.num_series):
if t_value_start[i] < 0:
self._t_score[i] = t.ppf(self._p_value[i] / 2, self._get_df())
else:
self._t_score[i] = t.ppf(1 - self._p_value[i] / 2, self._get_df())
def _calc_cov(self) -> float:
"""
Calculates the covariance of x and y
"""
current = self.current.data
previous = self.previous.data
if current is None or previous is None:
return np.nan
n_min = min(len(current), len(previous))
if n_min == 0:
return np.nan
current = current[-n_min:-1]
previous = previous[-n_min:-1]
return np.cov(current, previous)[0, 1] / n_min
def _delta_method(self) -> None:
test_mean = self.current.mean_val
control_mean = self.previous.mean_val
test_var = self.current.variance_val
control_var = self.previous.variance_val
n_test = len(self.current)
n_control = len(self.previous)
cov_xy = self._calc_cov()
sigma_sq_ratio = (
test_var / (n_test * (control_mean ** 2))
- 2 * (test_mean * cov_xy) / (control_mean ** 3)
+ (control_var * (test_mean ** 2)) / (n_control * (control_mean ** 4))
)
# the signs appear flipped because norm.ppf(0.025) ~ -1.96
self.lower = self.ratio_estimate + norm.ppf(self.alpha / 2) * np.sqrt(
abs(sigma_sq_ratio)
)
self.upper = self.ratio_estimate - norm.ppf(self.alpha / 2) * np.sqrt(
abs(sigma_sq_ratio)
)
[docs]@dataclass
class ConfidenceBand:
lower: TimeSeriesData
upper: TimeSeriesData
class AnomalyResponse:
def __init__(
self,
scores: TimeSeriesData,
confidence_band: ConfidenceBand,
predicted_ts: TimeSeriesData,
anomaly_magnitude_ts: TimeSeriesData,
stat_sig_ts: TimeSeriesData,
):
self.scores = scores
self.confidence_band = confidence_band
self.predicted_ts = predicted_ts
self.anomaly_magnitude_ts = anomaly_magnitude_ts
self.stat_sig_ts = stat_sig_ts
self.key_mapping = []
self.num_series = 1
if not self.scores.is_univariate():
self.num_series = len(scores.value.columns)
self.key_mapping = list(scores.value.columns)
def update(
self,
time: datetime,
score: Union[float, ArrayLike],
ci_upper: Union[float, ArrayLike],
ci_lower: Union[float, ArrayLike],
pred: Union[float, ArrayLike],
anom_mag: Union[float, ArrayLike],
stat_sig: Union[float, ArrayLike],
) -> None:
"""
Add one more point and remove the last point
"""
self.scores = self._update_ts_slice(self.scores, time, score)
self.confidence_band = ConfidenceBand(
lower=self._update_ts_slice(self.confidence_band.lower, time, ci_lower),
upper=self._update_ts_slice(self.confidence_band.upper, time, ci_upper),
)
self.predicted_ts = self._update_ts_slice(self.predicted_ts, time, pred)
self.anomaly_magnitude_ts = self._update_ts_slice(
self.anomaly_magnitude_ts, time, anom_mag
)
self.stat_sig_ts = self._update_ts_slice(self.stat_sig_ts, time, stat_sig)
def _update_ts_slice(
self, ts: TimeSeriesData, time: datetime, value: Union[float, ArrayLike]
) -> TimeSeriesData:
time = ts.time.iloc[1:].append(pd.Series(time))
time.reset_index(drop=True, inplace=True)
if self.num_series == 1:
value = ts.value.iloc[1:].append(pd.Series(value))
value.reset_index(drop=True, inplace=True)
return TimeSeriesData(time=time, value=value)
else:
if isinstance(value, float):
raise ValueError(
f"num_series = {self.num_series} so value should have type ArrayLike."
)
value_dict = {}
for i, value_col in enumerate(self.key_mapping):
value_dict[value_col] = (
ts.value[value_col].iloc[1:].append(pd.Series(value[i]))
)
value_dict[value_col].reset_index(drop=True, inplace=True)
return TimeSeriesData(
pd.DataFrame(
{
**{"time": time},
**{
value_col: value_dict[value_col]
for value_col in self.key_mapping
},
}
)
)
def inplace_update(
self,
time: datetime,
score: Union[float, ArrayLike],
ci_upper: Union[float, ArrayLike],
ci_lower: Union[float, ArrayLike],
pred: Union[float, ArrayLike],
anom_mag: Union[float, ArrayLike],
stat_sig: Union[float, ArrayLike],
) -> None:
"""
Add one more point and remove the last point
"""
self._inplace_update_ts(self.scores, time, score)
self._inplace_update_ts(self.confidence_band.lower, time, ci_lower),
self._inplace_update_ts(self.confidence_band.upper, time, ci_upper)
self._inplace_update_ts(self.predicted_ts, time, pred)
self._inplace_update_ts(self.anomaly_magnitude_ts, time, anom_mag)
self._inplace_update_ts(self.stat_sig_ts, time, stat_sig)
def _inplace_update_ts(
self, ts: TimeSeriesData, time: datetime, value: Union[float, ArrayLike]
) -> None:
if self.num_series == 1:
ts.value.loc[ts.time == time] = value
else:
ts.value.loc[ts.time == time] = pd.DataFrame(value)
def get_last_n(self, N: int) -> AnomalyResponse:
"""
returns the response for the last N days
"""
return AnomalyResponse(
scores=self.scores[-N:],
confidence_band=ConfidenceBand(
upper=self.confidence_band.upper[-N:],
lower=self.confidence_band.lower[-N:],
),
predicted_ts=self.predicted_ts[-N:],
anomaly_magnitude_ts=self.anomaly_magnitude_ts[-N:],
stat_sig_ts=self.stat_sig_ts[-N:],
)
def __str__(self) -> str:
str_ret = f"""
Time: {self.scores.time.values},
Scores: {self.scores.value.values},
Upper Confidence Bound: {self.confidence_band.upper.value.values},
Lower Confidence Bound: {self.confidence_band.lower.value.values},
Predicted Time Series: {self.predicted_ts.value.values},
stat_sig:{self.stat_sig_ts.value.values}
"""
return str_ret