Source code for kats.detectors.bocpd_model

#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""
This file implements the Bayesian Online Changepoint Detection
algorithm as a DetectorModel, to provide a common interface.
"""

import json
from typing import Optional

import pandas as pd
from kats.consts import TimeSeriesData
from kats.detectors.bocpd import (
    BOCPDetector,
    BOCPDModelType,
)
from kats.detectors.detector import DetectorModel
from kats.detectors.detector_consts import (
    AnomalyResponse,
    ConfidenceBand,
)


[docs]class BocpdDetectorModel(DetectorModel): """Implements the Bayesian Online Changepoint Detection as a DetectorModel. This provides an unified interface, which is common to all detection algorithms. Attributes: serialized_model: json containing information about stored model. slow_drift: Boolean. True indicates we are trying to detect trend changes. False indicates we are trying to detect level changes. Typical Usage: level_ts is an instance of TimeSeriesData >>> bocpd_detector = BocpdDetectorModel() >>> anom = bocpd_detector.fit_predict(data=level_ts) """ def __init__( self, serialized_model: Optional[bytes] = None, slow_drift: bool = False, threshold: Optional[float] = None, ): if serialized_model is None: self.slow_drift = slow_drift self.threshold = threshold else: model_dict = json.loads(serialized_model) if "slow_drift" in model_dict: self.slow_drift = model_dict["slow_drift"] else: self.slow_drift = slow_drift if "threshold" in model_dict: self.threshold = model_dict["threshold"] else: self.threshold = threshold
[docs] def serialize(self) -> bytes: """Returns the serialzed model. Args: None. Returns: json containing information about serialized model. """ model_dict = {"slow_drift": self.slow_drift} return json.dumps(model_dict).encode("utf-8")
def _handle_missing_data_extend( self, data: TimeSeriesData, historical_data: TimeSeriesData ) -> TimeSeriesData: # extend() works only when there is no missing data # hence, we will interpolate if there is missing data # but we will remove the interpolated data when we # evaluate, to make sure that the anomaly score is # the same length as data original_time_list = list(historical_data.time) + list(data.time) if historical_data.is_data_missing(): historical_data = historical_data.interpolate() if data.is_data_missing(): data = data.interpolate() historical_data.extend(data) # extend has been done, now remove the interpolated data data = TimeSeriesData( pd.DataFrame( { "time": [ historical_data.time.iloc[i] for i in range(len(historical_data)) if historical_data.time.iloc[i] in original_time_list ], "value": [ historical_data.value.iloc[i] for i in range(len(historical_data)) if historical_data.time.iloc[i] in original_time_list ], } ), use_unix_time=True, unix_time_units="s", tz="US/Pacific", ) return data # pyre-fixme[14]: `fit_predict` overrides method defined in `DetectorModel` # inconsistently.
[docs] def fit_predict( self, data: TimeSeriesData, historical_data: Optional[TimeSeriesData] = None ) -> AnomalyResponse: """Finds changepoints and returns score. Uses the current data and historical data to find the changepoints, and returns an AnomalyResponse object, the scores corresponding to probability of changepoints. Args: data: TimeSeriesData object representing the data historical_data: TimeSeriesdata object representing the history. Dats should start exactly where the historical_data ends. Returns: AnomalyResponse object, representing the changepoint probabilities. The score property contains the changepoint probabilities. The length of the object is the same as the length of the data. """ # pyre-fixme[16]: `BocpdDetectorModel` has no attribute `last_N`. self.last_N = len(data) # if there is historical data # we prepend it to data, and run # the detector as if we only saw data if historical_data is not None: data = self._handle_missing_data_extend(data, historical_data) bocpd_model = BOCPDetector(data=data) if not self.slow_drift: if self.threshold is not None: _ = bocpd_model.detector( model=BOCPDModelType.NORMAL_KNOWN_MODEL, choose_priors=True, agg_cp=True, threshold=self.threshold, ) else: _ = bocpd_model.detector( model=BOCPDModelType.NORMAL_KNOWN_MODEL, choose_priors=True, agg_cp=True, ) else: if self.threshold is not None: _ = bocpd_model.detector( model=BOCPDModelType.NORMAL_KNOWN_MODEL, choose_priors=True, agg_cp=True, threshold=self.threshold, ) else: _ = bocpd_model.detector( model=BOCPDModelType.TREND_CHANGE_MODEL, choose_priors=False, agg_cp=True, ) change_prob_dict = bocpd_model.get_change_prob() change_prob = list(change_prob_dict.values())[0] # construct the object N = len(data) default_ts = TimeSeriesData(time=data.time, value=pd.Series(N * [0.0])) score_ts = TimeSeriesData(time=data.time, value=pd.Series(change_prob)) # pyre-fixme[16]: `BocpdDetectorModel` has no attribute `response`. self.response = AnomalyResponse( scores=score_ts, confidence_band=ConfidenceBand(upper=data, lower=data), predicted_ts=data, anomaly_magnitude_ts=default_ts, stat_sig_ts=default_ts, ) return self.response.get_last_n(self.last_N)
# pyre-fixme[14]: `predict` overrides method defined in `DetectorModel` # inconsistently.
[docs] def predict( self, data: TimeSeriesData, historical_data: Optional[TimeSeriesData] = None ) -> AnomalyResponse: """ predict is not implemented """ raise ValueError("predict is not implemented, call fit_predict() instead")
# pyre-fixme[14]: `fit` overrides method defined in `DetectorModel` inconsistently.
[docs] def fit( self, data: TimeSeriesData, historical_data: Optional[TimeSeriesData] = None ) -> None: """ fit can be called during priming. It's a noop for us. """ return