#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from datetime import datetime
import logging
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymannkendall as mk
from kats.consts import TimeSeriesChangePoint, TimeSeriesData
from kats.detectors.detector import Detector
from statsmodels.tsa.api import SimpleExpSmoothing
pd.options.plotting.matplotlib.register_converters = True
"""Mann-Kendall (MK) Trend Detector Module
This module includes detectors based on the Mann-Kendall Test, which is a
non-parametric test for monotonic trends. Right now, this module includes
detectors based on the original MK Test and the Multivariate MK Test. Neither
detector has any distribution requirement for the data, but there should not be
any serial correlation.
"""
[docs]class MKDetector(Detector):
"""
MKDetector (MK stands for Mann-Kendall) is a non-parametric statistical test
used to determine whether there is a monotonic trend in a given time series.
See https://vsp.pnnl.gov/help/vsample/Design_Trend_Mann_Kendall.htm for
details.
The basic idea is to check whether there is a monotonic trend based on a
look back number of time steps (`window_size`).
Parameters:
data: `TimeSeriesData`, this is time series data at one-day granularity.
This time series can be either univariate or multivariate.
We require more than training_days points in each time series.
threshold: `float`, threshold for trend intensity; higher threshold gives
trend with high intensity (0.8 by default). If we only want to use
the p-value to determine changepoints, set threshold = 0.
alpha: `float`, significance level (0.05 by default)
multivariate: `bool`, whether the input time series is multivariate
Example:
--------
>>> import pandas as pd
>>> from kats.consts import TimeSeriesData
>>> from kats.detectors.trend_mk import MKDetector
>>> # read data and rename the two columns required by TimeSeriesData
>>> # structure
>>> data = pd.read_csv("../filename.csv") # demo file does not exist
>>> TSdata = TimeSeriesData(data)
>>> # create MKDetector with given data and params
>>> d = MKDetector(data=TSdata)
>>> # call detector method to fit model
>>> detected_time_points = d.detector(window_size=20, direction="up")
>>> # plot the results
>>> d.plot(detected_time_points)
"""
window_size: Optional[int] = None
training_days: Optional[int] = None
direction: Optional[str] = None
freq: Optional[str] = None
ts: Optional[pd.DataFrame] = None
MK_statistics: Optional[pd.DataFrame] = None
def __init__(
self,
data: Optional[TimeSeriesData] = None,
threshold: float = 0.8,
alpha: float = 0.05,
multivariate: bool = False,
) -> None:
# pyre-fixme[6]: Expected `TimeSeriesData` for 1st param but got
# `Optional[TimeSeriesData]`.
super(MKDetector, self).__init__(data=data)
self.threshold = threshold
self.alpha = alpha
self.multivariate = multivariate
self.__subtype__ = "trend_detector"
# Assume univariate but multivariate data is detected
if self.data is not None:
if not self.data.is_univariate() and not self.multivariate:
logging.warning("Using multivariate MK test for univariate data.")
self.multivariate = True
# Assume multivariate but univariate data is detected
elif self.data.is_univariate() and self.multivariate:
logging.warning("Using univariate MK test on multivariate data.")
def _remove_seasonality(
self, ts: pd.DataFrame, freq: Optional[str] = None
) -> pd.DataFrame:
"""Remove seasonality in the time series using moving average."""
if freq is None:
return ts # no seasonality
else:
map = {"weekly": 7, "monthly": 30, "yearly": 365}
ts = ts.rolling(window=map[freq]).mean()
return ts
def _smoothing(self, ts: pd.DataFrame) -> pd.DataFrame:
"""Remove noise in the time series using Holt-Winters model."""
smoothed_ts = pd.DataFrame()
for c in ts.columns:
ts_c = ts[c].dropna()
assert ts_c is not None
with np.errstate(divide="raise"):
try:
model = SimpleExpSmoothing(ts_c)
_fit = model.fit(smoothing_level=0.2, optimized=False)
smoothed_ts_tmp = _fit.predict(
start=ts_c.index[0],
end=ts_c.index[-1],
)
smoothed_ts = pd.concat(
[smoothed_ts, smoothed_ts_tmp.rename(c)], axis=1
)
except FloatingPointError:
smoothed_ts = pd.concat([smoothed_ts, ts_c], axis=1)
logging.debug(
"Your data does not have noise. No need for smoothing"
)
return smoothed_ts
def _preprocessing(self, ts: pd.DataFrame) -> Tuple[np.ndarray, int]:
"""Check and convert the dataframe ts to an numpy array.
Args:
ts: a time series dataframe with time as index.
Returns:
A numpy array of length n and the # of metrics.
"""
window_size = self.window_size
assert window_size is not None
# takes only window_size days
x = np.asarray(ts[-window_size:])
dim = x.ndim
# checks the dimension of the data
if dim == 2: # dim should always be 2
(n, c) = x.shape # n is # of obs = window_size; c is # of metrics
if c == 1: # univariate case
dim = 1 # convert x from 2-dim array (n, 1) to 1-dim array (n,)
x = x.flatten()
else:
msg = f"dim = 2 is expected but your data has dim = {dim}."
raise ValueError(msg)
return x, c
def _drop_missing_values(self, x: np.ndarray) -> Tuple[np.ndarray, int]:
"""Drop the missing values in x."""
if x.ndim == 1: # univariate case with 1-dim array/ shape(n,)
x = x[~(np.isnan(x))]
else: # multivariate case with 2-dim arrat/ shape (n, c)
x = x[~np.isnan(x).any(axis=1)]
return x, len(x)
def _apply_threshold(self, trend, Tau):
if abs(Tau) <= self.threshold:
return "no trend"
return trend
[docs] def MKtest(self, ts: pd.DataFrame) -> Tuple[datetime, str, float, float]:
"""Performs the Mann-Kendall (MK) test for trend detection.
(Mann 1945, Kendall 1975, Gilbert 1987)
Args:
ts: the dataframe of input data with time as index.
This time series should not present seasonality for MK test.
Returns:
(tuple): tuple containing:
anchor_date(datetime): the last time point in ts; the date for
which alert is triggered
trend(str): tells the trend (decreasing, increasing, or no trend)
p(float): p-value of the significance test
Tau(float): Kendall Tau-b statistic (https://en.wikipedia.org/wiki/Kendall_rank_correlation_coefficient#Tau-b)
"""
x, _ = self._preprocessing(ts)
x, _ = self._drop_missing_values(x)
anchor_date = ts.index[-1]
mk_result = mk.original_test(x)
trend, p, Tau = mk_result.trend, mk_result.p, mk_result.Tau
trend = self._apply_threshold(trend, Tau)
return anchor_date, trend, p, Tau
[docs] def multivariate_MKtest(
self, ts: pd.DataFrame
) -> Tuple[datetime, str, float, Dict]:
"""Performs the Multivariate Mann-Kendall (MK) test.
Proposed by R. M. Hirsch and J. R. Slack (1984).
Args:
ts: the dataframe of input data with time as index. This time
series should not present seasonality for MK test.
Returns:
(tuple): tuple containing:
anchor_date(datetime): the last time point in ts; the date for
which alert is triggered.
trend:_dict: tells the trend (decreasing, increasing, or no trend)
for each metric.
p: p-value of the significance test.
Tau_dict: Dictionary of Kendall Tau-b statistics for each univariate
time series, and Tau_dict["overall"] gives the Tau-b statistic
for the multivariate time series.
"""
anchor_date = ts.index[-1]
Tau_dict = {} # contains score for individual cluster and overall
trend_dict = {} # contains trend for individual cluster and overall
x, c = self._preprocessing(ts)
multi_mk_result = mk.multivariate_test(x)
trend, p, Tau = multi_mk_result.trend, multi_mk_result.p, multi_mk_result.Tau
trend = self._apply_threshold(trend, Tau)
Tau_dict["overall"] = Tau
trend_dict["overall"] = trend
for i in range(c):
x_i, n = self._drop_missing_values(x[:, i])
# individual Tau score and trend
try:
mk_result = mk.original_test(x_i)
trend_i, Tau_i = mk_result.trend, mk_result.Tau
trend_i = self._apply_threshold(trend_i, Tau_i)
Tau_dict[ts.columns[i]] = Tau_i
trend_dict[ts.columns[i]] = trend_i
except ZeroDivisionError:
Tau_dict[ts.columns[i]] = None
trend_dict[ts.columns[i]] = None
return anchor_date, trend_dict, p, Tau_dict
[docs] def runDetector(self, ts: pd.DataFrame) -> Dict[str, Any]:
"""Runs MK test for a time point in the input data.
Args:
ts: the dataframe of input data with noise and seasonality removed.
Its index is time.
Returns:
A dictionary consisting of MK test statistics for the anchor time
point, including trend, p-value and Kendall Tau.
"""
# run MK test
if self.multivariate:
anchor_date, trend, p, Tau = self.multivariate_MKtest(ts)
else:
anchor_date, trend, p, Tau = self.MKtest(ts)
return {"ds": anchor_date, "trend_direction": trend, "p": p, "Tau": Tau}
# pyre-fixme[14]: `detector` overrides method defined in `Detector` inconsistently.
[docs] def detector(
self,
window_size: int = 20,
training_days: Optional[int] = None,
direction: str = "both",
freq: Optional[str] = None,
) -> List[Tuple[TimeSeriesChangePoint, MKMetadata]]:
"""Runs MK test sequentially.
It finds the trend and calculates the related statistics for all time
points in a given time series.
Args:
window_size: int, the number of look back days for checking trend
persistence
training_days: int, the number of days for time series smoothing;
should be greater or equal to window_size
If training_days is None, we will perform trend detection on the
whole time series; otherwise, we will perform trend detection
only for the anchor point using the previous training_days data.
direction: string, the direction of the trend to be detected, choose
from {"down", "up", "both"}
freq: str, the type of seasonality shown in the time series,
choose from {'weekly','monthly','yearly'}
"""
self.window_size = window_size
self.training_days = training_days
self.direction = direction
self.freq = freq
ts = self.data.to_dataframe().set_index("time")
ts = ts.dropna(axis=1)
# pyre-ignore[16]: `DataFrame` has no attribute `index`.
ts.index = pd.DatetimeIndex(ts.index.values, freq=ts.index.inferred_freq)
self.ts = ts
if training_days is None:
logging.info("Performing trend detection on the whole time series...")
# check validity of the input value
if len(ts) < window_size:
raise ValueError(
f"For the whole time series analysis, data must have at "
f"least window_size={window_size} points."
)
else:
logging.info(
f"Performing trend detection for the anchor date {ts.index[-1]}"
f" with training_days={training_days}..."
)
# check validity of the input value
if training_days < window_size:
raise ValueError(
f"For the anchor date analysis, training days should have "
f"at least window_size={window_size} points."
)
if len(ts) < training_days:
raise ValueError(
f"For the anchor date analysis, data must have "
f"at least training_days={training_days} points."
)
# save the trend detection results to dataframe MK_statistics
MK_statistics = pd.DataFrame(columns=["ds", "trend_direction", "p", "Tau"])
if training_days is not None: # anchor date analysis for real-time setting
# only look back training_days for noise and seasonality removal
start = ts.index[-1] - timedelta(days=training_days)
ts = ts.loc[start : ts.index[-1], :]
# deseasonalization
ts_deseas = self._remove_seasonality(ts, freq=self.freq)
ts_smoothed = self._smoothing(ts_deseas) # smoothing
# append MK statistics to MK_statistics dataframe
MK_statistics = MK_statistics.append(
# pyre-ignore[6]: Expected `Union[Dict[Union[int, str], typing.Any], L...
self.runDetector(ts=ts_smoothed), ignore_index=True
)
else:
# use the whole time series for for noise and seasonality removal
ts_deseas = self._remove_seasonality(ts, freq=freq)
ts_smoothed = self._smoothing(ts_deseas)
# run detector sequentially with sliding_window for the whole time
# series
for t in ts_smoothed.index[window_size:]:
# look back window_size day for trend detection
ts_tmp = ts_smoothed.loc[:t, :]
# append MK statistics to MK_statistics dataframe
MK_statistics = MK_statistics.append(
# pyre-ignore[6]: Expected `Union[Dict[Union[int, str], typing.Any...
self.runDetector(ts=ts_tmp), ignore_index=True
)
self.MK_statistics = MK_statistics
# take the subset for detection with specified trend_direction
MK_results = self.get_MK_results(
MK_statistics=MK_statistics, direction=direction
)
return self._convert_detected_tps(MK_results)
[docs] def get_MK_results(
self, MK_statistics: pd.DataFrame, direction: str
) -> pd.DataFrame:
"""Obtain a subset of MK_statistics given the desired direction"""
if direction not in ["up", "down", "both"]:
raise ValueError("direction should be chosen from {'up', 'down', 'both'}")
if self.multivariate:
trend_df = pd.DataFrame.from_dict(list(MK_statistics.trend_direction))
overall_trend = trend_df["overall"]
if direction == "down":
MK_results = MK_statistics.loc[overall_trend == "decreasing", :]
elif direction == "up":
MK_results = MK_statistics.loc[overall_trend == "increasing", :]
elif direction == "both":
MK_results = MK_statistics.loc[overall_trend != "no trend", :]
else:
if direction == "down":
MK_results = MK_statistics.loc[
MK_statistics["trend_direction"] == "decreasing", :
]
elif direction == "up":
MK_results = MK_statistics.loc[
MK_statistics["trend_direction"] == "increasing", :
]
elif direction == "both":
MK_results = MK_statistics.loc[
MK_statistics["trend_direction"] != "no trend", :
]
return MK_results
def _convert_detected_tps(
self, MK_results: pd.DataFrame
) -> List[Tuple[TimeSeriesChangePoint, MKMetadata]]:
"""Convert the dataframe of detected_tps and Tau into desired format."""
converted = []
for _index, row in MK_results.iterrows():
t = row["ds"]
detected_time_point = TimeSeriesChangePoint(
start_time=t, end_time=t, confidence=1 - row["p"]
)
metadata = MKMetadata(
is_multivariate=self.multivariate,
trend_direction=row["trend_direction"],
Tau=row["Tau"],
)
converted.append((detected_time_point, metadata))
return converted
[docs] def get_MK_statistics(self) -> pd.DataFrame:
"""Get the dataframe of MK_statistics."""
MK_statistics = self.MK_statistics
if MK_statistics is None:
raise ValueError("Call detector() first.")
return MK_statistics
[docs] def get_top_k_metrics(
self, time_point: datetime, top_k: Optional[int] = None
) -> pd.DataFrame:
"""Get k metrics that show the most significant trend at a time point.
Works only for multivariate data.
Args:
time_point: the time point to be investigated.
top_k: the number of top metrics.
Returns:
a dataframe consists of top_k metrics and their corresponding
Kendall Tau and trend.
"""
Tau_df, trend_df = self._metrics_analysis()
Tau_df = Tau_df.melt(id_vars=["ds"], var_name="metric", value_name="Tau")
trend_df = trend_df.melt(
id_vars=["ds"], var_name="metric", value_name="trend_direction"
)
if self.training_days is not None:
time_point = self.data.time.iloc[-1]
# time_point default to the only anchor date for real-time detection
# obtain the Tau for all metrics at the time point
Tau_df_tp = Tau_df.loc[Tau_df["ds"] == time_point, :]
trend_df_tp = trend_df.loc[trend_df["ds"] == time_point, :]
MK_statistics_tp = pd.merge(Tau_df_tp, trend_df_tp)
# sort the metrics according to their Tau
if self.direction == "down":
top_metrics = MK_statistics_tp.reindex(
MK_statistics_tp.Tau.sort_values(axis=0).index
)
elif self.direction == "up":
top_metrics = MK_statistics_tp.reindex(
MK_statistics_tp.Tau.sort_values(axis=0, ascending=False).index
)
else:
assert self.direction == "both"
top_metrics = MK_statistics_tp.reindex(
MK_statistics_tp.Tau.abs().sort_values(axis=0, ascending=False).index
)
if top_k is None:
# if top_k not specified, return all metrics ranked by Tau
return top_metrics
return top_metrics.iloc[:top_k]
[docs] def plot_heat_map(self) -> pd.DataFrame:
"""Plots the Tau of each metric in a heatmap.
Returns:
a dataframe contains Tau for all metrics at all time points.
"""
import plotly.graph_objects as go
Tau_df, _ = self._metrics_analysis()
Tau_df = Tau_df.set_index("ds")
fig = go.Figure(
data=go.Heatmap(
z=Tau_df.T.values,
x=Tau_df.index,
y=Tau_df.columns,
colorscale="Viridis",
reversescale=True,
)
)
fig.update_layout(
xaxis_title="time",
yaxis_title="value",
xaxis={"title": "time", "tickangle": 45},
)
fig.show()
return Tau_df
def _metrics_analysis(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
if not self.multivariate:
raise ValueError("Your data is not multivariate.")
MK_statistics = self.MK_statistics
assert MK_statistics is not None
# obtain the Tau for all metrics at all time points
Tau_df = pd.DataFrame.from_dict(list(MK_statistics.Tau))
Tau_df["ds"] = MK_statistics.ds
Tau_df = Tau_df.drop(["overall"], axis=1) # remove overall score
trend_df = pd.DataFrame.from_dict(list(MK_statistics.trend_direction))
trend_df["ds"] = MK_statistics.ds
trend_df = trend_df.drop(["overall"], axis=1) # remove overall trend
return Tau_df, trend_df
[docs] def plot(
self, detected_time_points: List[Tuple[TimeSeriesChangePoint, MKMetadata]]
) -> None:
"""Plots the original time series data, and the detected time points."""
ts = self.ts
if ts is None:
raise ValueError("")
plt.figure(figsize=(14, 5))
plt.plot(ts.index, ts.values)
if len(detected_time_points) == 0:
logging.warning("No trend detected!")
for t in detected_time_points:
plt.axvline(x=t[0].start_time, color="red")