Source code for kats.detectors.seasonality

#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""This module is for seasonality detection.

We provide two seasonality detector: ACFDetector and FFTDetector. ACFDetector uses
autocorrelation function to find seasonality, while FFTDetector uses Fast Fourier
Transform to detect seasonality.

Typical usage example:

>>> timeseries = TimeSeriesData(...)
>>> # initialize detector
>>> detector = ACFDetector(timeseries)
>>> # run detector
>>> detector.detector(diff=1, alpha = 0.01)
>>> # seasonality decomposition, returns trend, seasonal, residual term
>>> detector.remover()
>>> # plot acf and decompsition results
>>> detector.plot()
"""

import logging
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union
import plotly.graph_objs as go

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.fftpack as fp
import statsmodels.api as sm
from kats.consts import TimeSeriesData
from kats.detectors.detector import Detector
from kats.graphics.plots import make_fourier_plot
from kats.utils.decomposition import TimeSeriesDecomposition
from scipy.signal import find_peaks  # @manual
from statsmodels.tsa.stattools import acf

# from numpy.typing import ArrayLike
ArrayLike = Union[np.ndarray, Sequence[float]]


[docs]class ACFDetector(Detector): """ Autocorrelation function seasonality detector. Use acf to detect seasonality, and find out the potential cycle lengths Attributes: data: The input time series data from TimeSeriesData decomposed: A bool indicate if we decomposed the time series into trend, seasonal and residual. """ ts_diff: Optional[ArrayLike] = None lags: Optional[int] = None seasonality: Optional[List[int]] = None seasonality_detected: bool = False decompose: Optional[TimeSeriesDecomposition] = None def __init__(self, data: TimeSeriesData): super().__init__(data=data) if not isinstance(self.data.value, pd.Series): msg = "Only support univariate time series, but get {type}.".format( type=type(self.data.value) ) logging.error(msg) raise ValueError(msg) self.decomposed = False def _get_seasonality_length(self, d: List[int]) -> List[int]: out = [] while d: k = d.pop(0) d = [i for i in d if i % k != 0] out.append(k) return out # pyre-fixme[14]: Inconsistent override [14]: `kats.detectors.seasonality.ACFDetector.detector` overrides method defined in `Detector` inconsistently. Could not find parameter `method` in overriding signature. # pyre-fixme[14]: `kats.detectors.seasonality.ACFDetector.detector` overrides method defined in `Detector` inconsistently. Returned type `Dict[str, typing.Any]` is not a subtype of the overridden return `None`.
[docs] def detector(self, lags: Optional[int]=None, diff: int=1, alpha: Optional[float]=0.01) -> Dict[str, Any]: """Detect seasonality This method runs acf and returns if seasonality detected in the given time series and potential cycle lengths Args: lags: Optional; int; the maximum lags we used in acf. diff: Optional; int; times of diff run on timeseries to remove trend before apply acf. alpha: Optional; float; significant level we use the calcualte autocorrelation confidence interval. Returns: A dict contains - seasonality_presence: bool, if seasonality detected - seasonalities: List[int], potential seasonlities cycle length(s) """ # Use array to store the data ts = self.data.value.values ts_diff = ts for _ in range(diff): ts_diff = np.diff(ts_diff) self.ts_diff = ts_diff if lags is None: lags = int(len(ts) / 3) self.lags = lags ac, confint, qstat, qval = acf(ts_diff, nlags=lags, qstat=True, alpha=alpha) # get seasonality cycle length raw_seasonality = [] for i, _int in enumerate(confint): if _int[0] >= 0 and i > 1: raw_seasonality.append(i) self.seasonality = self._get_seasonality_length(raw_seasonality) self.seasonality_detected = True if self.seasonality else False return { "seasonality_presence": self.seasonality_detected, "seasonalities": self.seasonality, }
[docs] def plot(self) -> None: """Plot detection results. Args: None Returns: None """ sm.graphics.tsa.plot_acf(self.ts_diff, lags=self.lags) plt.show() if self.decomposed: decompose = self.decompose assert decompose is not None decompose.plot() plt.show()
# pyre-fixme[14]: `kats.detectors.seasonality.ACFDetector.remover` overrides method defined in `Detector` inconsistently. Could not find parameter `interpolate` in overriding signature. # pyre-fixme[15]: `kats.detectors.seasonality.ACFDetector.remover` overrides method defined in `Detector` inconsistently. Returned type `Optional[Dict[str, typing.Any]]` is not a subtype of the overridden return `TimeSeriesData`.
[docs] def remover(self, decom: Type=TimeSeriesDecomposition, model: str="additive", decompose_any_way: bool=False) -> Optional[Dict[str, Any]]: """Remove the seasonality in the time series Args: decom: Optional; decomposition method. model: Optional; model used for decomposition. decompose_any_way: Optional; bool; decompose the time series even when seasonality is not detected in the time series. Returns: decomposition results of the decomposition method. """ if decompose_any_way or self.seasonality_detected: self.decompose = decompose = decom(self.data, model) result = decompose.decomposer() self.decomposed = True return result else: logging.info("No seasonality detected, not running decomposition")
[docs]class FFTDetector(Detector): """Fast Fourier Transform Seasoanlity detector Use Fast Fourier Transform to detect seasonality, and find out the potential cycle's length. Attributes: data: The input time series data from TimeSeriesData. """ def __init__(self, data: TimeSeriesData): super().__init__(data=data) if not self.data.is_univariate(): msg = "The provided time series data is not univariate." logging.error(msg) raise ValueError(msg) # pyre-fixme[14]: `detector` overrides method defined in `Detector` inconsistently.
[docs] def detector(self, sample_spacing: float = 1.0, mad_threshold: float = 6.0) -> Dict: """Detect seasonality with FFT Args: sample_spacing: Optional; float; scaling FFT for a different time unit. I.e. for hourly time series, sample_spacing=24.0, FFT x axis will be 1/day. mad_threshold: Optional; float; constant for the outlier algorithm for peak detector. The larger the value the less sensitive the outlier algorithm is. Returns: FFT Plot with peaks, selected peaks, and outlier boundary line. """ fft = self.get_fft(sample_spacing) _, orig_peaks, peaks = self.get_fft_peaks(fft, mad_threshold) # pyre-fixme[6]: Expected `Sized` for 1st param but got # `BoundMethod[typing.Callable(list.index)[[Named(self, List[float]), float, # int, default, int, default], int], List[float]]`. seasonality_presence = len(peaks.index) > 0 selected_seasonalities = [] if seasonality_presence: # pyre-fixme[16]: `float` has no attribute `transform`. # pyre-fixme[6]: Expected `_SupportsIndex` for 1st param but got `str`. selected_seasonalities = peaks["freq"].transform(lambda x: 1 / x).tolist() return { "seasonality_presence": seasonality_presence, "seasonalities": selected_seasonalities, }
# pyre-fixme[15]: `plot` overrides method defined in `Detector` inconsistently.
[docs] def plot( self, time_unit: str, sample_spacing: float = 1.0, title: str = "FFT", mad_threshold: float = 6.0, ) -> go.Figure: """Plots an FFT plot as a plotly figure Args: time_unit: string containing the unit of time (displayed on x axis). E.g. 'Hour'. sample_spacing: Optional; scaling FFT for a different time unit. I.e. for hourly time series, sample_spacing=24.0, FFT x axis will be 1/day. title: Optional; title of the plot. mad_threshold: Optional; constant for the outlier algorithm for peak detector. The larger the value the less sensitive the outlier algorithm is. Returns: FFT Plot with peaks, selected peaks, and outlier boundary line. """ fft = self.get_fft(sample_spacing) thres, orig_peaks, peaks = self.get_fft_peaks(fft, mad_threshold) return make_fourier_plot( fft, thres, orig_peaks, peaks, f"1/{time_unit}", title=title )
[docs] def get_fft(self, sample_spacing: float = 1.0) -> pd.DataFrame: """Computes FFT Args: sample_spacing: Optional; scaling FFT for a different time unit. I.e. for hourly time series, sample_spacing=24.0 FFT x axis will be 1/day. Returns: DataFrame with columns 'freq' and 'ampl'. """ data_fft = fp.fft(self.data.value.values) data_psd = np.abs(data_fft) ** 2 fftfreq = fp.fftfreq(len(data_psd), 1.0 / sample_spacing) pos_freq_ix = fftfreq > 0 freq = (fftfreq[pos_freq_ix],) ampl = (10 * np.log10(data_psd[pos_freq_ix]),) return pd.DataFrame({"freq": freq[0], "ampl": ampl[0]})
[docs] def get_fft_peaks( self, fft: pd.DataFrame, mad_threshold: float = 6.0 ) -> Tuple[float, List[float], List[float]]: """Computes peaks in fft, selects the highest peaks (outliers) and removes the harmonics (multiplies of the base harmonics found) Args: fft: FFT computed by FFTDetector.get_fft sample_spacing: Optional; scaling FFT for a different time unit. I.e. for hourly time series, sample_spacing=24.0 FFT x axis will be 1/day. mad_threshold: Optional; constant for the outlier algorithm for peak detector. The larger the value the less sensitive the outlier algorithm is. Returns: outlier threshold, peaks, selected peaks. """ pos_fft = fft.loc[fft["ampl"] > 0] median = pos_fft["ampl"].median() pos_fft_above_med = pos_fft[pos_fft["ampl"] > median] mad = pos_fft_above_med["ampl"].mad() threshold = median + mad * mad_threshold peak_indices = find_peaks(fft["ampl"], threshold=0.1) peaks = fft.loc[peak_indices[0], :] orig_peaks = peaks.copy() peaks = peaks.loc[peaks["ampl"] > threshold].copy() peaks["Remove"] = [False] * len(peaks.index) peaks.reset_index(inplace=True) # Filter out harmonics for idx1 in range(len(peaks)): curr = peaks.loc[idx1, "freq"] for idx2 in range(idx1 + 1, len(peaks)): if peaks.loc[idx2, "Remove"] is True: continue fraction = (peaks.loc[idx2, "freq"] / curr) % 1 if fraction < 0.01 or fraction > 0.99: peaks.loc[idx2, "Remove"] = True peaks = peaks.loc[~peaks["Remove"]] peaks.drop(inplace=True, columns="Remove") return threshold, orig_peaks, peaks
[docs] def plot_fft( self, time_unit: str, sample_spacing: float = 1.0, title: str = "FFT", mad_threshold: float = 6.0, ) -> go.Figure: """Plots an FFT plot as a plotly figure Args: time_unit: string containing the unit of time (displayed on x axis). E.g. 'Hour' sample_spacing: Optional; scaling FFT for a different time unit. I.e. for hourly time series, sample_spacing=24.0, FFT x axis will be 1/day title: Optional; title of the plot mad_threshold: Optional; constant for the outlier algorithm for peak detector. The larger the value the less sensitive the outlier algorithm is. Returns: FFT Plot with peaks, selected peaks, and outlier boundary line """ fft = self.get_fft(sample_spacing) thres, orig_peaks, peaks = self.get_fft_peaks(fft, mad_threshold) return make_fourier_plot( fft, thres, orig_peaks, peaks, f"1/{time_unit}", title=title )