# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Set
import re
import numpy as np
import pandas as pd
from collections import namedtuple
import copy
from kats.consts import TimeSeriesChangePoint, TimeSeriesData
from kats.detectors.detector import Detector
from kats.detectors.detector_consts import AnomalyResponse
from kats.utils.simulator import Simulator
# defining some helper functions
[docs]def get_cp_index(
changepoints: List[Tuple[TimeSeriesChangePoint, Any]], tsd: TimeSeriesData
) -> List[int]:
"""
Accepts the output of the Detector.detector() method which is a list of
tuples of (TimeSeriesChangePoint, Metadata) and returns the index of the
changepoints
"""
cp_list = []
tsd_df = tsd.to_dataframe()
tsd_df["time_index"] = list(range(tsd_df.shape[0]))
for cp, _ in changepoints:
tsd_row = tsd_df[tsd_df.time == cp.start_time]
this_cp = tsd_row["time_index"].values[0]
cp_list.append(this_cp)
return cp_list
def crossed_threshold(val: float, threshold_low: float, threshold_high: float) -> bool:
return (val < threshold_low) or (val > threshold_high)
def get_cp_index_from_alert_score(
score_val: np.ndarray, threshold_low: float, threshold_high: float
) -> List[int]:
cp_list = []
alert_on = False
for i in range(score_val.shape[0]):
crossed_bool = crossed_threshold(score_val[i], threshold_low, threshold_high)
# alarm went off
if crossed_bool and (not alert_on):
cp_list.append(i)
alert_on = True
# return back to normal
if (not crossed_bool) and alert_on:
cp_list.append(i)
alert_on = False
return cp_list
def get_cp_index_from_threshold_score(
score_val: float, threshold_low: float, threshold_high: float
) -> List[int]:
higher = np.where(score_val > threshold_high)[0]
lower = np.where(score_val < threshold_low)[0]
cp_list = list(set(higher).union(set(lower)))
return cp_list
def get_cp_index_from_detector_model(
anom_obj: AnomalyResponse,
alert_style_cp: bool,
threshold_low: float,
threshold_high: float,
) -> List[int]:
score_val = anom_obj.scores.value.values
if alert_style_cp:
cp_list = get_cp_index_from_alert_score(
score_val, threshold_low, threshold_high
)
else:
cp_list = get_cp_index_from_threshold_score(
score_val, threshold_low, threshold_high
)
return cp_list
# copied from https://github.com/alan-turing-institute/TCPDBench/blob/master/analysis/scripts/metrics.py
[docs]def true_positives(T: Set[int], X: Set[int], margin: int = 5) -> Set[int]:
"""Compute true positives without double counting.
>>> true_positives({1, 10, 20, 23}, {3, 8, 20})
{1, 10, 20}
>>> true_positives({1, 10, 20, 23}, {1, 3, 8, 20})
{1, 10, 20}
>>> true_positives({1, 10, 20, 23}, {1, 3, 5, 8, 20})
{1, 10, 20}
>>> true_positives(set(), {1, 2, 3})
set()
>>> true_positives({1, 2, 3}, set())
set()
Args:
T: true positives.
X: detected positives.
margin: threshold for absolute difference to be counted as different.
Returns:
The set of true positives.
"""
# make a copy so we don't affect the caller
X = copy.deepcopy(X)
X = set(X)
TP = set()
for tau in T:
close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin]
close.sort()
if not close:
continue
dist, xstar = close[0]
TP.add(tau)
X.remove(xstar)
return TP
# modified from https://github.com/alan-turing-institute/TCPDBench/blob/master/analysis/scripts/metrics.py
[docs]def f_measure(
annotations: Dict[str, List[int]],
predictions: List[int],
margin: int = 5,
alpha: float = 0.5,
) -> Dict[str, float]:
"""Compute the F-measure based on human annotations.
Remember that all CP locations are 0-based!
>>> f_measure({1: [10, 20], 2: [11, 20], 3: [10], 4: [0, 5]}, [10, 20])
1.0
>>> f_measure({1: [], 2: [10], 3: [50]}, [10])
0.9090909090909091
>>> f_measure({1: [], 2: [10], 3: [50]}, [])
0.8
Args:
annotations : dict from user_id to iterable of CP locations.
predictions : iterable of predicted CP locations.
alpha : value for the F-measure, alpha=0.5 gives the F1-measure.
return_PR : whether to return precision and recall too.
"""
# ensure 0 is in all the sets
Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)}
for Tk in Tks.values():
Tk.add(0)
X = set(predictions)
X.add(0)
Tstar = set()
for Tk in Tks.values():
for tau in Tk:
Tstar.add(tau)
K = len(Tks)
P = len(true_positives(Tstar, X, margin=margin)) / len(X)
TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks}
R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks)
F = P * R / (alpha * R + (1 - alpha) * P)
score_dict = {"f_score": F, "precision": P, "recall": R}
return score_dict
def generate_from_simulator(
cp_arr: Optional[List[int]] = None, level_arr: Optional[List[float]] = None,
ts_length: int = 450
) -> Tuple[Dict[str, Any], Dict[str, List[int]]]:
if cp_arr is None:
cp_arr = [100, 200, 350]
if level_arr is None:
level_arr = [1.35, 1.05, 1.35, 1.2]
sim2 = Simulator(n=ts_length, start="2018-01-01")
ts2 = sim2.level_shift_sim(
cp_arr=cp_arr,
level_arr=level_arr,
noise=0.05,
seasonal_period=7,
seasonal_magnitude=0.075,
)
ts2_df = ts2.to_dataframe()
ts2_dict = {str(row["time"]): row["value"] for _, row in ts2_df.iterrows()}
ts2_anno = {"1": [100, 200, 350]}
return ts2_dict, ts2_anno
[docs]class BenchmarkEvaluator(ABC):
def __init__(self, detector: Detector):
self.detector = detector
@abstractmethod
def evaluate(self):
pass
@abstractmethod
def load_data(self):
pass
Evaluation = namedtuple(
"Evaluation", ["dataset_name", "precision", "recall", "f_score"]
)
class EvalAggregate:
def __init__(self, eval_list: List[Evaluation]):
self.eval_list = eval_list
self.eval_df = None
def get_eval_dataframe(self) -> pd.DataFrame:
df_list = []
for this_eval in self.eval_list:
df_list.append(
{
"dataset_name": this_eval.dataset_name,
"precision": this_eval.precision,
"recall": this_eval.recall,
"f_score": this_eval.f_score,
}
)
self.eval_df = pd.DataFrame(df_list)
return self.eval_df
def get_avg_precision(self) -> float:
if self.eval_df is None:
_ = self.get_eval_dataframe()
avg_precision = np.mean(self.eval_df.precision)
return avg_precision
def get_avg_recall(self) -> float:
if self.eval_df is None:
_ = self.get_eval_dataframe()
avg_recall = np.mean(self.eval_df.recall)
return avg_recall
def get_avg_f_score(self) -> float:
if self.eval_df is None:
_ = self.get_eval_dataframe()
avg_f_score = np.mean(self.eval_df.f_score)
return avg_f_score
[docs]class TuringEvaluator(BenchmarkEvaluator):
"""
Evaluates a changepoint detection algorithm. The evaluation
follows the benchmarking method established in this paper:
https://arxiv.org/pdf/2003.06222.pdf.
By default, this evaluates the Turing changepoint benchmark,
which is introduced in the above paper. This is the most comprehensive
benchmark for changepoint detection algorithms.
You can also evaluate your own dataset. The dataset should be a
dataframe with 3 columns:
'dataset_name': str,
'time_series': str "{'0': 0.55, '1': 0.56}",
'annotation': str "{'0':[1,2], '1':[2,3]}"
Annotations allow different human beings to annotate a changepoints
in a time series. Each key consists of one human labeler's label. This
allows for uncertainty in labeling.
Usage:
>>> model_params = {'p_value_cutoff': 5e-3, 'comparison_window': 2}
>>> turing_2 = TuringEvaluator(detector = RobustStatDetector)
>>> eval_agg_df_2 = turing.evaluate(data=eg_df, model_params=model_params)
The above produces a dataframe with scores for each dataset
To get an average over all datasets you can do
>>> eval_agg = turing.get_eval_aggregate()
>>> avg_precision = eval_agg.get_average_precision()
"""
def __init__(self, detector: Detector, is_detector_model: bool = False):
super(TuringEvaluator, self).__init__(detector=detector)
self.detector = detector
self.eval_agg = None
self.is_detector_model = is_detector_model
def evaluate(
self,
model_params: Optional[Dict[str, float]] = None,
data: Optional[pd.DataFrame] = None,
ignore_list: Optional[List[str]] = None,
alert_style_cp: bool = False,
threshold_low: float = 0.0,
threshold_high: float = 1.0,
) -> pd.DataFrame:
if self.is_detector_model:
return self._evaluate_detector_model(
model_params=model_params,
data=data,
ignore_list=ignore_list,
alert_style_cp=alert_style_cp,
threshold_low=threshold_low,
threshold_high=threshold_high,
)
else:
return self._evaluate_detector(
model_params=model_params, data=data, ignore_list=ignore_list
)
def _evaluate_detector_model(
self,
model_params: Optional[Dict[str, float]] = None,
data: Optional[pd.DataFrame] = None,
ignore_list: Optional[List[str]] = None,
alert_style_cp: bool = False,
threshold_low: float = 0.0,
threshold_high: float = 1.0,
) -> pd.DataFrame:
if not ignore_list:
ignore_list = []
if model_params is None:
model_params = {}
if data is None:
# pyre-fixme[16]: `TuringEvaluator` has no attribute `ts_df`.
self.ts_df = self.load_data()
else:
self.ts_df = data
eval_list = []
for _, row in self.ts_df.iterrows():
this_dataset = row["dataset_name"]
if this_dataset in ignore_list:
continue
data_name, tsd, anno = self._parse_data(row)
# pyre-fixme[29]: `Detector` is not a function.
detector = self.detector(**model_params)
anom_obj = detector.fit_predict(tsd)
cp_list = get_cp_index_from_detector_model(
anom_obj, alert_style_cp, threshold_low, threshold_high
)
eval_dict = f_measure(annotations=anno, predictions=cp_list)
eval_list.append(
Evaluation(
dataset_name=data_name,
precision=eval_dict["precision"],
recall=eval_dict["recall"],
f_score=eval_dict["f_score"],
)
)
# break
self.eval_agg = EvalAggregate(eval_list)
eval_df = self.eval_agg.get_eval_dataframe()
return eval_df
def _evaluate_detector(
self,
model_params: Optional[Dict[str, float]] = None,
data: Optional[pd.DataFrame] = None,
ignore_list: Optional[List[str]] = None,
) -> pd.DataFrame:
# this is to avoid a mutable default parameter
if not ignore_list:
ignore_list = []
if model_params is None:
model_params = {}
if data is None:
# pyre-fixme[16]: `TuringEvaluator` has no attribute `ts_df`.
self.ts_df = self.load_data()
else:
self.ts_df = data
eval_list = []
for _, row in self.ts_df.iterrows():
this_dataset = row["dataset_name"]
if this_dataset in ignore_list:
continue
data_name, tsd, anno = self._parse_data(row)
# pyre-fixme[29]: `Detector` is not a function.
detector = self.detector(tsd)
change_points = detector.detector(**model_params)
cp_list = get_cp_index(change_points, tsd)
eval_dict = f_measure(annotations=anno, predictions=cp_list)
eval_list.append(
Evaluation(
dataset_name=data_name,
precision=eval_dict["precision"],
recall=eval_dict["recall"],
f_score=eval_dict["f_score"],
)
)
# break
self.eval_agg = EvalAggregate(eval_list)
eval_df = self.eval_agg.get_eval_dataframe()
return eval_df
[docs] def get_eval_aggregate(self):
"""
returns the EvalAggregate object, which can then be used for
for further processing
"""
return self.eval_agg
[docs] def load_data(self) -> pd.DataFrame:
"""
loads data, the source is either simulator or hive
"""
return self._load_data_from_simulator()
def _load_data_from_simulator(self) -> pd.DataFrame:
ts_dict, ts_anno = generate_from_simulator()
ts2_dict, ts2_anno = generate_from_simulator(
cp_arr=[50, 100, 150], level_arr=[1.1, 1.05, 1.35, 1.2]
)
eg_df = pd.DataFrame(
[
{
"dataset_name": "eg_1",
"time_series": str(ts_dict),
"annotation": str(ts_anno),
},
{
"dataset_name": "eg_2",
"time_series": str(ts_dict),
"annotation": str(ts2_anno),
},
]
)
return eg_df
def _parse_data(self, df_row: Any):
this_dataset = df_row["dataset_name"]
this_ts = df_row["time_series"]
this_anno = df_row["annotation"]
this_anno_json_acc = this_anno.replace("'", "\"")
print(this_dataset)
this_anno_dict = json.loads(this_anno_json_acc)
this_ts_json_acc = this_ts.replace("'", '\"')
try:
this_ts_dict = json.loads(this_ts_json_acc)
except Exception:
this_ts_json_acc = re.sub(r"(\w+):", r'"\1":', this_ts_json_acc)
this_ts_dict = json.loads(this_ts_json_acc)
ts = pd.DataFrame.from_dict(this_ts_dict, orient="index")
ts.sort_index(inplace=True)
ts.reset_index(inplace=True)
ts.columns = ["time", "y"]
first_time_val = ts.time.values[0]
if re.match(r"\d{4}-\d{2}-\d{2}", first_time_val):
# pyre-fixme[16]: `DataFrame` has no attribute `time`.
ts.time = pd.to_datetime(ts.time, format="%Y-%m-%d")
elif re.match(r"\d{4}-\d{2}", first_time_val):
ts.time = pd.to_datetime(ts.time, format="%Y-%m")
else:
ts.time = pd.to_datetime(ts.time, unit="s")
tsd = TimeSeriesData(
ts,
use_unix_time=True,
unix_time_units="s",
tz="US/Pacific",
time_col_name="time",
)
return this_dataset, tsd, this_anno_dict