Source code for kats.detectors.robust_stat_detection

#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import logging

import matplotlib.pyplot as plt
import numpy as np
from kats.consts import TimeSeriesData, TimeSeriesChangePoint
from kats.detectors.detector import Detector
# pyre-fixme[21]: Could not find name `zscore` in `scipy.stats`.
from scipy.stats import norm, zscore  # @manual

from typing import List, Tuple


class RobustStatMetadata:
    def __init__(self, index: int, metric: float) -> None:
        self._metric = metric
        self._index = index

    @property
    def metric(self):
        return self._metric

    @property
    def index(self):
        return self._index


[docs]class RobustStatDetector(Detector): def __init__(self, data: TimeSeriesData) -> None: super(RobustStatDetector, self).__init__(data=data) if not self.data.is_univariate(): msg = "Only support univariate time series, but get {type}.".format( type=type(self.data.value) ) logging.error(msg) raise ValueError(msg) # pyre-fixme[14]: `detector` overrides method defined in `Detector` inconsistently. def detector(self, p_value_cutoff: float = 1e-2, smoothing_window_size: int = 5, comparison_window: int = -2 ) -> List[Tuple[TimeSeriesChangePoint, RobustStatMetadata]]: time_col_name = self.data.time.name val_col_name = self.data.value.name data_df = self.data.to_dataframe() data_df = data_df.set_index(time_col_name) df_ = data_df.loc[:, val_col_name].rolling(window=smoothing_window_size) df_ = ( # Smooth df_.mean() .fillna(method="bfill") # Make spikes standout .diff(comparison_window) .fillna(0) ) # pyre-fixme[16]: Module `stats` has no attribute `zscore`. y_zscores = zscore(df_) p_values = norm.sf(np.abs(y_zscores)) ind = np.where(p_values < p_value_cutoff)[0] if len(ind) == 0: return [] # empty list for no change points change_points = [] prev_idx = -1 for idx in ind: if prev_idx != -1 and (idx - prev_idx) < smoothing_window_size: continue prev_idx = idx cp = TimeSeriesChangePoint( start_time=data_df.index.values[idx], end_time=data_df.index.values[idx], confidence=1 - p_values[idx]) metadata = RobustStatMetadata(index=idx, metric=float(df_.iloc[idx])) change_points.append((cp, metadata)) return change_points def plot(self, change_points: List[Tuple[TimeSeriesChangePoint, RobustStatMetadata]] ) -> None: time_col_name = self.data.time.name val_col_name = self.data.value.name data_df = self.data.to_dataframe() plt.plot(data_df[time_col_name], data_df[val_col_name]) if len(change_points) == 0: logging.warning('No change points detected!') for change in change_points: plt.axvline(x=change[0].start_time, color='red') plt.show()