#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
This module contains code to implement the Prophet algorithm
as a Detector Model.
"""
from enum import Enum
from typing import Optional
import numpy as np
import pandas as pd
from fbprophet import Prophet
from fbprophet.serialize import model_from_json, model_to_json
from kats.consts import TimeSeriesData
from kats.detectors.detector import DetectorModel
from kats.detectors.detector_consts import (
AnomalyResponse,
ConfidenceBand,
)
PROPHET_TIME_COLUMN = "ds"
PROPHET_VALUE_COLUMN = "y"
PROPHET_YHAT_COLUMN = "yhat"
PROPHET_YHAT_LOWER_COLUMN = "yhat_lower"
PROPHET_YHAT_UPPER_COLUMN = "yhat_upper"
MIN_STDEV = 1e-9
[docs]def timeseries_to_prophet_df(ts_data: TimeSeriesData) -> pd.DataFrame:
"""Converts a object of TimeSeriesData to a dataframe, as expected by Prophet.
Args:
ts_data: object of class TimeSeriesData.
Returns:
pandas DataFrame expected by Prophet.
"""
if not ts_data.is_univariate():
raise ValueError("ProphetModel only works with univariate data")
return pd.DataFrame(
{
PROPHET_TIME_COLUMN: ts_data.time,
PROPHET_VALUE_COLUMN: ts_data.value,
}
)
def deviation_from_predicted_val(
data: TimeSeriesData,
predict_df: pd.DataFrame,
ci_threshold: Optional[float] = None,
uncertainty_samples: Optional[float] = None,
):
return (data.value - predict_df[PROPHET_YHAT_COLUMN]) / predict_df[
PROPHET_YHAT_COLUMN
].abs()
def z_score(
data: TimeSeriesData,
predict_df: pd.DataFrame,
ci_threshold: float = 0.8,
uncertainty_samples: float = 50,
):
# asymmetric confidence band => points above the prediction use upper bound in calculation, points below the prediction use lower bound
actual_upper_std = (
(uncertainty_samples ** 0.5)
* (predict_df[PROPHET_YHAT_UPPER_COLUMN] - predict_df[PROPHET_YHAT_COLUMN])
/ ci_threshold
)
actual_lower_std = (
(uncertainty_samples ** 0.5)
* (predict_df[PROPHET_YHAT_COLUMN] - predict_df[PROPHET_YHAT_LOWER_COLUMN])
/ ci_threshold
)
# if std is 0, set it to a very small value to prevent division by zero in next step
upper_std = np.maximum(actual_upper_std, MIN_STDEV)
lower_std = np.maximum(actual_lower_std, MIN_STDEV)
upper_score = (
(data.value > predict_df[PROPHET_YHAT_COLUMN])
* (data.value - predict_df[PROPHET_YHAT_COLUMN])
/ upper_std
)
lower_score = (
(data.value < predict_df[PROPHET_YHAT_COLUMN])
* (data.value - predict_df[PROPHET_YHAT_COLUMN])
/ lower_std
)
return upper_score + lower_score
[docs]class ProphetScoreFunction(Enum):
deviation_from_predicted_val = "deviation_from_predicted_val"
z_score = "z_score"
SCORE_FUNC_DICT = {
ProphetScoreFunction.deviation_from_predicted_val.value: deviation_from_predicted_val,
ProphetScoreFunction.z_score.value: z_score,
}
[docs]class ProphetDetectorModel(DetectorModel):
"""Prophet based anomaly detection model.
A Detector Model that does anomaly detection, buy first using the Prophet
library to forecast the interval for the next point, and comparing this
to the actually observed data point.
Attributes:
strictness_factor: interval_width as required by Prophet.
uncertainty_samples: Number of samples required by Prophet to
calculate uncertainty.
serialized_model: json, representing data from a previously
serialized model.
"""
def __init__(
self,
strictness_factor: float = 0.8,
uncertainty_samples: float = 50,
serialized_model: Optional[bytes] = None,
remove_outliers=False,
score_func: ProphetScoreFunction = ProphetScoreFunction.deviation_from_predicted_val.value,
) -> None:
if serialized_model:
self.model = model_from_json(serialized_model)
else:
self.model = None
self.strictness_factor = strictness_factor
self.uncertainty_samples = uncertainty_samples
self.remove_outliers = remove_outliers
self.score_func = score_func
[docs] def serialize(self) -> bytes:
"""Serialize the model into a json.
So it can be loaded later.
Returns:
json containing information of the model.
"""
return str.encode(model_to_json(self.model))
# pyre-fixme[14]: `fit_predict` overrides method defined in `DetectorModel`
# inconsistently.
[docs] def fit_predict(
self, data: TimeSeriesData, historical_data: Optional[TimeSeriesData] = None
) -> AnomalyResponse:
"""Trains a model, and returns the anomaly scores.
Returns the AnomalyResponse, when data is passed to it.
Args:
data: TimeSeriesData on which detection is run.
historical_data: TimeSeriesData corresponding to history. History ends exactly where
the data begins.
Returns:
AnomalyResponse object. The length of this object is same as data. The score property
gives the score for anomaly.
"""
# train on historical, then predict on all data.
# pyre-fixme[6]: Expected `TimeSeriesData` for 1st param but got
# `Optional[TimeSeriesData]`.
self.fit(data=historical_data, historical_data=None)
return self.predict(data)
# pyre-fixme[14]: `fit` overrides method defined in `DetectorModel` inconsistently.
[docs] def fit(
self, data: TimeSeriesData, historical_data: Optional[TimeSeriesData] = None
) -> None:
"""Used to train a model.
fit can be called during priming. We train a model using all the data passed in.
Args:
data: TimeSeriesData on which detection is run.
historical_data: TimeSeriesData corresponding to history. History ends exactly where
the data begins.
Returns:
None.
"""
if historical_data is None:
total_data = data
else:
historical_data.extend(data)
total_data = historical_data
# No incremental training. Create a model and train from scratch
self.model = Prophet(
interval_width=self.strictness_factor,
uncertainty_samples=self.uncertainty_samples,
)
data_df = timeseries_to_prophet_df(total_data)
if self.remove_outliers:
data_df = self._remove_outliers(data_df)
self.model.fit(data_df)
# pyre-fixme[14]: `predict` overrides method defined in `DetectorModel`
# inconsistently.
[docs] def predict(
self,
data: TimeSeriesData,
historical_data: Optional[TimeSeriesData] = None,
) -> AnomalyResponse:
"""Predicts anomaly score for future data.
Predict only expects anomaly score for data. Prophet doesn't need historical_data.
Args:
data: TimeSeriesData on which detection is run
historical_data: TimeSeriesData corresponding to history. History ends exactly where
the data begins.
Returns:
AnomalyResponse object. The length of this obj.ect is same as data. The score property
gives the score for anomaly.
"""
time_df = pd.DataFrame({PROPHET_TIME_COLUMN: data.time})
predict_df = self.model.predict(time_df)
zeros = np.zeros(len(data))
response = AnomalyResponse(
scores=TimeSeriesData(
time=data.time,
value=SCORE_FUNC_DICT[self.score_func](
data=data,
predict_df=predict_df,
ci_threshold=self.model.interval_width,
uncertainty_samples=self.uncertainty_samples,
),
),
confidence_band=ConfidenceBand(
upper=TimeSeriesData(
time=data.time, value=predict_df[PROPHET_YHAT_UPPER_COLUMN]
),
lower=TimeSeriesData(
time=data.time, value=predict_df[PROPHET_YHAT_LOWER_COLUMN]
),
),
predicted_ts=TimeSeriesData(
time=data.time, value=predict_df[PROPHET_YHAT_COLUMN]
),
anomaly_magnitude_ts=TimeSeriesData(time=data.time, value=pd.Series(zeros)),
stat_sig_ts=TimeSeriesData(time=data.time, value=pd.Series(zeros)),
)
return response
@staticmethod
def _remove_outliers(
ts_df: pd.DataFrame,
outlier_ci_threshold: float = 0.99,
uncertainty_samples: float = 50,
) -> pd.DataFrame:
"""
Remove outliers from the time series by fitting a Prophet model to the time series
and stripping all points that fall outside the confidence interval of the predictions
of the model.
"""
ts_dates_df = pd.DataFrame({PROPHET_TIME_COLUMN: ts_df.iloc[:, 0]})
model = Prophet(
interval_width=outlier_ci_threshold, uncertainty_samples=uncertainty_samples
)
model_pass1 = model.fit(ts_df)
forecast = model_pass1.predict(ts_dates_df)
is_outlier = (
ts_df[PROPHET_VALUE_COLUMN] < forecast[PROPHET_YHAT_LOWER_COLUMN]
) | (ts_df[PROPHET_VALUE_COLUMN] > forecast[PROPHET_YHAT_UPPER_COLUMN])
ts_df = ts_df[~is_outlier]
return ts_df