#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import re
import logging
import statsmodels
from functools import partial
from itertools import groupby
from typing import List, Optional, Dict
import numpy as np
import pandas as pd
import statsmodels.api as sm
from kats.consts import TimeSeriesData
from kats.detectors import (
    cusum_detection,
    bocpd,
    robust_stat_detection,
    outlier,
    trend_mk,
    seasonality,
)
from numba import jit  # @manual
from scipy import stats
from scipy.signal import periodogram  # @manual
from statsmodels.stats.diagnostic import het_arch
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.seasonal import STL
from statsmodels.tsa.stattools import acf, pacf, kpss
"""TsFeatures is a module for performing adhoc feature engineering on time series
data using different statistics.
The module process time series data into features for machine learning models.
We include seasonality, autocorrelation, modeling parameter, changepoints,
moving statistics, and raw statistics of time series array as the adhoc features.
We also offer to compute part of the features or group of features using
selected_features argument, you could also disable feature or group of
features by setting feature_name/feature_group_name = False. You can find
all feature group names in feature_group_mapping attribute.
"""
[docs]class TsFeatures:
    """Process time series data into features for machine learning models, with the
    function to opt-in/out feature and feature groups in the calculations.
    Attributes:
        window_size: int; Length of the sliding window for getting level shift features,
            lumpiness, and stability of time series.
        spectral_freq: int; Frequency parameter in getting periodogram through scipy for
            calculating Shannon entropy.
        stl_period: int; Period parameter for performing seasonality trend decomposition
            using LOESS with statsmodels.
        nbins: int; Number of bins to equally segment time series array for getting flat
            spot feature.
        lag_size: int; Maximum number of lag values for calculating Hurst Exponent.
        acfpacf_lag: int; Largest lag number for returning ACF/PACF features via statsmodels.
        decomp: str; Additive or Multiplicative mode for performing outlier detection using
            Kats.Detectors.outlier.OutlierDetector.
        iqr_mult: float; IQR range for determining outliers through
            Kats.Detectors.outlier.OutlierDetector.
        threshold: float; threshold for trend intensity; higher threshold gives
            trend with high intensity (0.8 by default).  If we only want to use the
            p-value to determine changepoints, set threshold = 0.
        window: int; length of window for all nowcasting features.
        n_fast: int; length of "fast" or short period exponential moving average in the MACD
            algorithm in the nowcasting features.
        n_slow: int; length of "slow" or long period exponential moving average in the MACD
            algorithm in the nowcasting features.
        selected_features: None or List[str]; list of feature/feature group name(s)
            selected to be calculated. We will try only calculating selected
            features, since some features are bundled in the calculations. This process
            helps with boosting efficiency, and we will only output selected features.
        feature_group_mapping: The dictionary with the mapping from individual features
            to their bundled feature groups.
        final_filter: A dicitonary with boolean as the values to filter out the features
            not selected, yet calculated due to underlying bundles.
        stl_features: Switch for calculating/outputting stl features.
        level_shift_features: Switch for calculating/outputting level shift features.
        acfpacf_features: Switch for calculating/outputting ACF/PACF features.
        special_ac: Switch for calculating/outputting  features.
        holt_params: Switch for calculating/outputting holt parameter features.
        hw_params: Switch for calculating/outputting holt-winters parameter features.
        statistics: Switch for calculating/outputting raw statistics features.
        cusum_detector: Switch for calculating/outputting features using cusum detector
            in Kats.
        robust_stat_detector: Switch for calculating/outputting features using robust stat detector
            in Kats.
        bocp_detector: Switch for calculating/outputting stl features features using bocp detector
            in Kats.
        outlier_detector: Switch for calculating/outputting stl features using outlier detector
            in Kats.
        trend_detector: Switch for calculating/outputting stl features using trend detector
            in Kats.
        nowcasting: Switch for calculating/outputting stl features using nowcasting detector
            in Kats.
        seasonalities: Switch for calculating/outputting stl features using cusum detector
            in Kats.
        default: The default status of the switch for opt-in/out feature calculations.
    """
    def __init__(
        self,
        window_size: int = 20,
        spectral_freq: int = 1,
        stl_period: int = 7,
        nbins: int = 10,
        lag_size: int = 30,
        acfpacf_lag: int = 6,
        decomp: str = "additive",
        iqr_mult: float = 3.0,
        threshold: float = 0.8,
        window: int = 5,
        n_fast: int = 12,
        n_slow: int = 21,
        selected_features: Optional[List[str]] = None,
        **kwargs,
    ):
        # init hyper-parameters
        self.window_size = window_size
        self.spectral_freq = spectral_freq
        self.stl_period = stl_period
        self.nbins = nbins
        self.lag_size = lag_size
        self.acfpacf_lag = acfpacf_lag
        self.decomp = decomp
        self.iqr_mult = iqr_mult
        self.threshold = threshold
        self.window = window
        self.n_fast = n_fast
        self.n_slow = n_slow
        # Mapping group features
        g2f = {
            "stl_features": [
                "trend_strength",
                "seasonality_strength",
                "spikiness",
                "peak",
                "trough",
            ],
            "level_shift_features": [
                "level_shift_idx",
                "level_shift_size",
            ],
            "acfpacf_features": [
                "y_acf1",
                "y_acf5",
                "diff1y_acf1",
                "diff1y_acf5",
                "diff2y_acf1",
                "diff2y_acf5",
                "y_pacf5",
                "diff1y_pacf5",
                "diff2y_pacf5",
                "seas_acf1",
                "seas_pacf1",
            ],
            "special_ac": [
                "firstmin_ac",
                "firstzero_ac",
            ],
            "holt_params": [
                "holt_alpha",
                "holt_beta",
            ],
            "hw_params": [
                "hw_alpha",
                "hw_beta",
                "hw_gamma",
            ],
            "statistics": [
                "length",
                "mean",
                "var",
                "entropy",
                "lumpiness",
                "stability",
                "flat_spots",
                "hurst",
                "std1st_der",
                "crossing_points",
                "binarize_mean",
                "unitroot_kpss",
                "heterogeneity",
                "histogram_mode",
                "linearity",
            ],
            "cusum_detector": [
                "cusum_num",
                "cusum_conf",
                "cusum_cp_index",
                "cusum_delta",
                "cusum_llr",
                "cusum_regression_detected",
                "cusum_stable_changepoint",
                "cusum_p_value",
            ],
            "robust_stat_detector": [
                "robust_num",
                "robust_metric_mean",
            ],
            "bocp_detector": [
                "bocp_num",
                "bocp_conf_max",
                "bocp_conf_mean",
            ],
            "outlier_detector": [
                "outlier_num",
            ],
            "trend_detector": [
                "trend_num",
                "trend_num_increasing",
                "trend_avg_abs_tau",
            ],
            "nowcasting": [
                "nowcast_roc",
                "nowcast_ma",
                "nowcast_mom",
                "nowcast_lag",
                "nowcast_macd",
                "nowcast_macdsign",
                "nowcast_macddiff",
            ],
            "seasonalities": [
                "seasonal_period",
                "trend_mag",
                "seasonality_mag",
                "residual_std",
            ],
        }
        self.feature_group_mapping = g2f
        f2g = {}
        for k, v in g2f.items():
            for f in v:
                f2g[f] = k
        self._total_feature_len_ = len(f2g.keys())
        for f in kwargs.keys():
            assert (
                f in f2g.keys() or f in g2f.keys()
            ), f"""couldn't find your desired feature/group "{f}", please check spelling"""
        # Higher level of features:
        # Once disabled, won't even go inside these groups of features
        # for calculation
        if not selected_features:
            default = True
            self.final_filter = {k: default for k in f2g.keys()}
        elif selected_features:
            default = False
            self.final_filter = {k: default for k in f2g.keys()}
            for f in selected_features:
                assert (
                    f in f2g.keys() or f in g2f.keys()
                ), f"""couldn't find your desired feature/group "{f}", please check spelling"""
                if f in g2f.keys():  # the opt-in request is for a feature group
                    kwargs[f] = True
                    for feature in g2f[f]:
                        kwargs[feature] = kwargs.get(feature, True)
                        self.final_filter[feature] = True
                elif f in f2g.keys():  # the opt-in request is for a certain feature
                    assert kwargs.get(
                        f2g[f], True
                    ), f"""feature group: {f2g[f]} has to be opt-in based on your opt-in request of feature: {f}"""
                    assert kwargs.get(
                        f, True
                    ), f"""you have requested to both opt-in and opt-out feature: {f}"""
                    kwargs[f2g[f]] = True  # need to opt-in the feature group first
                    kwargs[f] = True  # opt-in the feature
                    self.final_filter[f] = True
        for k, v in kwargs.items():
            self.final_filter[
                k
            ] = v  # final filter for filtering out features user didn't request and keep only the requested ones
        # setting default value for the switches of calculating the group of features or not
        self.stl_features = kwargs.get("stl_features", default)
        self.level_shift_features = kwargs.get("level_shift_features", default)
        self.acfpacf_features = kwargs.get("acfpacf_features", default)
        self.special_ac = kwargs.get("special_ac", default)
        self.holt_params = kwargs.get("holt_params", default)
        self.hw_params = kwargs.get("hw_params", default)
        self.statistics = kwargs.get("statistics", default)
        self.cusum_detector = kwargs.get("cusum_detector", False)
        self.robust_stat_detector = kwargs.get("robust_stat_detector", False)
        self.bocp_detector = kwargs.get("bocp_detector", False)
        self.outlier_detector = kwargs.get("outlier_detector", False)
        self.trend_detector = kwargs.get("trend_detector", False)
        self.nowcasting = kwargs.get("nowcasting", False)
        self.seasonalities = kwargs.get("seasonalities", False)
        # For lower level of the features
        self.__kwargs__ = kwargs
        self.default = default
    def _transform_1d(self, x: np.ndarray, ts: TimeSeriesData):
        """
        Transform single (univariate) time series
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            ts: The univariate time series array in the form of Kats Timeseries
                Data object.
        Returns:
            The dictionary with all the features aggregated from the outputs of
            each feature group calculator.
        """
        # calculate STL based features
        dict_stl_features = {}
        if self.stl_features:
            dict_stl_features = self.get_stl_features(
                x,
                period=self.stl_period,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # calculate level shift based features
        dict_level_shift_features = {}
        if self.level_shift_features:
            dict_level_shift_features = self.get_level_shift(
                x,
                window_size=self.window_size,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # calculate ACF, PACF based features
        dict_acfpacf_features = {}
        if self.acfpacf_features:
            dict_acfpacf_features = self.get_acfpacf_features(
                x,
                acfpacf_lag=self.acfpacf_lag,
                period=self.stl_period,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # calculate special AC
        dict_specialac_features = {}
        if self.special_ac:
            dict_specialac_features = self.get_special_ac(
                x, extra_args=self.__kwargs__, default_status=self.default
            )
        # calculate holt params
        dict_holt_params_features = {}
        if self.holt_params:
            dict_holt_params_features = self.get_holt_params(
                x, extra_args=self.__kwargs__, default_status=self.default
            )
        # calculate hw params
        dict_hw_params_features = {}
        if self.hw_params:
            dict_hw_params_features = self.get_hw_params(
                x,
                period=self.stl_period,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # single features
        _dict_features_ = {}
        if self.statistics:
            dict_features = {
                "length": partial(self.get_length),
                "mean": partial(self.get_mean),
                "var": partial(self.get_var),
                "entropy": partial(self.get_spectral_entropy, freq=self.spectral_freq),
                "lumpiness": partial(self.get_lumpiness, window_size=self.window_size),
                "stability": partial(self.get_stability, window_size=self.window_size),
                "flat_spots": partial(self.get_flat_spots, nbins=self.nbins),
                "hurst": partial(self.get_hurst, lag_size=self.lag_size),
                "std1st_der": partial(self.get_std1st_der),
                "crossing_points": partial(self.get_crossing_points),
                "binarize_mean": partial(self.get_binarize_mean),
                "unitroot_kpss": partial(self.get_unitroot_kpss),
                "heterogeneity": partial(self.get_het_arch),
                "histogram_mode": partial(self.get_histogram_mode, nbins=self.nbins),
                "linearity": partial(self.get_linearity),
            }
            _dict_features_ = {}
            for k, v in dict_features.items():
                if self.__kwargs__.get(k, self.default):
                    _dict_features_[k] = v(x)
        # calculate cusum detector features
        dict_cusum_detector_features = {}
        if self.cusum_detector:
            dict_cusum_detector_features = self.get_cusum_detector(
                ts, extra_args=self.__kwargs__, default_status=self.default
            )
        # calculate robust stat detector features
        dict_robust_stat_detector_features = {}
        if self.robust_stat_detector:
            dict_robust_stat_detector_features = self.get_robust_stat_detector(
                ts, extra_args=self.__kwargs__, default_status=self.default
            )
        # calculate bocp detector features
        dict_bocp_detector_features = {}
        if self.bocp_detector:
            dict_bocp_detector_features = self.get_bocp_detector(
                ts, extra_args=self.__kwargs__, default_status=self.default
            )
        # calculate outlier detector features
        dict_outlier_detector_features = {}
        if self.outlier_detector:
            dict_outlier_detector_features = self.get_outlier_detector(
                ts,
                decomp=self.decomp,
                iqr_mult=self.iqr_mult,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # calculate trend detector features
        dict_trend_detector_features = {}
        if self.trend_detector:
            dict_trend_detector_features = self.get_trend_detector(
                ts,
                threshold=self.threshold,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # calculate nowcasting features
        dict_nowcasting_features = {}
        if self.nowcasting:
            dict_nowcasting_features = self.get_nowcasting(
                x,
                window=self.window,
                n_fast=self.n_fast,
                n_slow=self.n_slow,
                extra_args=self.__kwargs__,
                default_status=self.default,
            )
        # calculate seasonality features
        dict_seasonality_features = {}
        if self.seasonalities:
            dict_seasonality_features = self.get_seasonalities(
                ts, extra_args=self.__kwargs__, default_status=self.default
            )
        return {
            **_dict_features_,
            **dict_stl_features,
            **dict_level_shift_features,
            **dict_acfpacf_features,
            **dict_specialac_features,
            **dict_holt_params_features,
            **dict_hw_params_features,
            **dict_cusum_detector_features,
            **dict_robust_stat_detector_features,
            **dict_bocp_detector_features,
            **dict_outlier_detector_features,
            **dict_trend_detector_features,
            **dict_nowcasting_features,
            **dict_seasonality_features,
        }
    # length
[docs]    @staticmethod
    @jit(nopython=True)
    def get_length(x: np.ndarray):
        """
        Getting the length of time series array.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            Length of the time series array.
        """
        return len(x) 
    # mean
[docs]    @staticmethod
    @jit(nopython=True)
    def get_mean(x: np.ndarray):
        """
        Getting the average value of time series array.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            Average of the time series array.
        """
        return np.mean(x) 
    # variance
[docs]    @staticmethod
    @jit(nopython=True)
    def get_var(x: np.ndarray):
        """
        Getting the variance of time series array.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            Variance of the time series array.
        """
        return np.var(x) 
    # spectral entropy
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_spectral_entropy(x: np.ndarray, freq: int = 1):
        """
        Getting normalized Shannon entropy of power spectral density.
        PSD is calculated using scipy's periodogram.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            freq: int; Frequency for calculating the PSD via scipy periodogram.
                Default value is 1.
        Returns:
            Normalized Shannon entropy.
        """
        # calculate periodogram
        _, psd = periodogram(x, freq)
        # calculate shannon entropy of normalized psd
        psd_norm = psd / np.sum(psd)
        entropy = np.nansum(psd_norm * np.log2(psd_norm))
        return -(entropy / np.log2(psd_norm.size)) 
    # lumpiness
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_lumpiness(x: np.ndarray, window_size: int = 20):
        """
        Calculating the lumpiness of time series.
        Lumpiness is defined as the variance of the chunk-wise variances.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            window_size: int; Window size to split the data into chunks for getting
                variances. Default value is 20.
        Returns:
            Lumpiness of the time series array.
        """
        v = [np.var(x_w) for x_w in np.array_split(x, len(x) // window_size + 1)]
        return np.var(v) 
    # stability
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_stability(x: np.ndarray, window_size: int = 20):
        """
        Calculating the stability of time series.
        Stability is defined as the variance of chunk-wise means.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            window_size: int; Window size to split the data into chunks for getting
                variances. Default value is 20.
        Returns:
            Stability of the time series array.
        """
        v = [np.mean(x_w) for x_w in np.array_split(x, len(x) // window_size + 1)]
        return np.var(v) 
    # STL decomposition based features
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_stl_features(
        x: np.ndarray,
        period: int = 7,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Calculate STL based features for a time series, including strength of
        trend, seasonality, spikiness, peak/trough.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            period: int; Period parameter for performing seasonality trend decomposition
                using LOESS with statsmodels. Default value is 7.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Seasonality features including strength of trend, seasonality, spikiness,
            peak/trough.
        """
        stl_features = {}
        # STL decomposition
        res = STL(x, period=period).fit()
        # strength of trend
        if extra_args is not None and extra_args.get("trend_strength", default_status):
            stl_features["trend_strength"] = 1 - np.var(res.resid) / np.var(
                res.trend + res.resid
            )
        # strength of seasonality
        if extra_args is not None and extra_args.get("seasonality_strength", default_status):
            stl_features["seasonality_strength"] = 1 - np.var(res.resid) / np.var(
                res.seasonal + res.resid
            )
        # spikiness: variance of the leave-one-out variances of the remainder component
        if extra_args is not None and extra_args.get("spikiness", default_status):
            resid_array = np.repeat(
                np.array(res.resid)[:, np.newaxis], len(res.resid), axis=1
            )
            resid_array[np.diag_indices(len(resid_array))] = np.NaN
            stl_features["spikiness"] = np.var(np.nanvar(resid_array, axis=0))
        # location of peak
        if extra_args is not None and extra_args.get("peak", default_status):
            stl_features["peak"] = np.argmax(res.seasonal[:period])
        # location of trough
        if extra_args is not None and extra_args.get("trough", default_status):
            stl_features["trough"] = np.argmin(res.seasonal[:period])
        return stl_features 
    # Level shift
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_level_shift(
        x: np.ndarray,
        window_size: int = 20,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ) -> pd.DataFrame:
        """
        Calculating level shift features.
        * level_shift_idx: Location of the maximum mean value difference,
          between two consecutive sliding windows
        * level_shift_size: Size of the maximum mean value difference,
          between two consecutive sliding windows
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            window_size: int; Length of the sliding window. Default value is 20.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Level shift features including level_shift_idx, and level_shift_size
        """
        level_shift_features = {"level_shift_idx": np.nan, "level_shift_size": np.nan}
        if len(x) < window_size + 2:
            msg = "Length of time series is shorter than window_size, unable to calculate level shift features!"
            logging.error(msg)
            return level_shift_features
        sliding_idx = (np.arange(len(x))[None, :] + np.arange(window_size)[:, None])[
            :, : len(x) - window_size + 1
        ]
        means = np.mean(x[sliding_idx], axis=0)
        mean_diff = np.abs(means[:-1] - means[1:])
        if extra_args is not None and extra_args.get("level_shift_idx", default_status):
            level_shift_features["level_shift_idx"] = np.argmax(mean_diff)
        if extra_args is not None and extra_args.get("level_shift_size", default_status):
            level_shift_features["level_shift_size"] = mean_diff[np.argmax(mean_diff)]
        return level_shift_features 
    # Flat spots
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_flat_spots(x: np.ndarray, nbins: int = 10):
        """
        Getting flat spots: Maximum run-lengths across equally-sized segments of time series
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            nbins: int; Number of bins to segment time series data into.
        Returns:
            Maximum run-lengths across segmented time series array.
        """
        if len(x) <= nbins:
            msg = "Length of time series is shorter than nbins, unable to calculate flat spots feature!"
            logging.error(msg)
            return np.nan
        max_run_length = 0
        window_size = int(len(x) / nbins)
        for i in range(0, len(x), window_size):
            run_length = np.max(
                [len(list(v)) for k, v in groupby(x[i : i + window_size])]
            )
            if run_length > max_run_length:
                max_run_length = run_length
        return max_run_length 
    # Hurst Exponent
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_hurst(x: np.ndarray, lag_size: int = 30):
        """
        Getting: Hurst Exponent wiki: https://en.wikipedia.org/wiki/Hurst_exponent
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            lag_size: int; Size for getting lagged time series data. Default value
                is 30.
        Returns:
            The Hurst Exponent of the time series array
        """
        # Create the range of lag values
        lags = range(2, min(lag_size, len(x) - 1))
        # Calculate the array of the variances of the lagged differences
        tau = [np.std(np.asarray(x)[lag:] - np.asarray(x)[:-lag]) for lag in lags]
        # Use a linear fit to estimate the Hurst Exponent
        poly = np.polyfit(np.log(lags), np.log(tau), 1)
        # Return the Hurst exponent from the polyfit output
        return poly[0] if not np.isnan(poly[0]) else 0 
    # ACF and PACF features
    # ACF features
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_acf_features(
        extra_args: Dict[str, bool],
        default_status: bool,
        y_acf_list: List[float],
        diff1y_acf_list: List[float],
        diff2y_acf_list: List[float],
    ):
        """
        Aggregating extracted ACF features from get_acfpacf_features function.
        Args:
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
            y_acf_list: List of ACF values acquired from original time series.
            diff1y_acf_list: List of ACF values acquired from differenced time series.
            diff2y_acf_list: List of ACF values acquired from twice differenced time series.
        Returns:
            Auto-correlation function (ACF) features.
        """
        (
            y_acf1,
            y_acf5,
            diff1y_acf1,
            diff1y_acf5,
            diff2y_acf1,
            diff2y_acf5,
            seas_acf1,
        ) = (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan)
        # y_acf1: first ACF value of the original series
        if extra_args.get("y_acf1", default_status):
            y_acf1 = y_acf_list[0]
        # y_acf5: sum of squares of first 5 ACF values of original series
        if extra_args.get("y_acf5", default_status):
            y_acf5 = np.sum(np.asarray(y_acf_list)[:5] ** 2)
        # diff1y_acf1: first ACF value of the differenced series
        if extra_args.get("diff1y_acf1", default_status):
            diff1y_acf1 = diff1y_acf_list[0]
        # diff1y_acf5: sum of squares of first 5 ACF values of differenced series
        if extra_args.get("diff1y_acf5", default_status):
            diff1y_acf5 = np.sum(np.asarray(diff1y_acf_list)[:5] ** 2)
        # diff2y_acf1: first ACF value of the twice-differenced series
        if extra_args.get("diff2y_acf1", default_status):
            diff2y_acf1 = diff2y_acf_list[0]
        # diff2y_acf5: sum of squares of first 5 ACF values of twice-differenced series
        if extra_args.get("diff2y_acf5", default_status):
            diff2y_acf5 = np.sum(np.asarray(diff2y_acf_list)[:5] ** 2)
        # Autocorrelation coefficient at the first seasonal lag.
        if extra_args.get("seas_acf1", default_status):
            seas_acf1 = y_acf_list[-1]
        return (
            y_acf1,
            y_acf5,
            diff1y_acf1,
            diff1y_acf5,
            diff2y_acf1,
            diff2y_acf5,
            seas_acf1,
        ) 
    # PACF features
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_pacf_features(
        extra_args: Dict[str, bool],
        default_status: bool,
        y_pacf_list: List[float],
        diff1y_pacf_list: List[float],
        diff2y_pacf_list: List[float],
    ):
        """
        Aggregating extracted PACF features from get_acfpacf_features function.
        Args:
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
            y_pacf_list: List of PACF values acquired from original time series.
            diff1y_pacf_list: List of PACF values acquired from differenced time series.
            diff2y_pacf_list: List of PACF values acquired from twice differenced time series.
        Returns:
            Partial auto-correlation function (PACF) features.
        """
        (
            y_pacf5,
            diff1y_pacf5,
            diff2y_pacf5,
            seas_pacf1,
        ) = (np.nan, np.nan, np.nan, np.nan)
        # y_pacf5: sum of squares of first 5 PACF values of original series
        if extra_args.get("y_pacf5", default_status):
            y_pacf5 = np.nansum(np.asarray(y_pacf_list)[:5] ** 2)
        # diff1y_pacf5: sum of squares of first 5 PACF values of differenced series
        if extra_args.get("diff1y_pacf5", default_status):
            diff1y_pacf5 = np.nansum(np.asarray(diff1y_pacf_list)[:5] ** 2)
        # diff2y_pacf5: sum of squares of first 5 PACF values of twice-differenced series
        if extra_args.get("diff2y_pacf5", default_status):
            diff2y_pacf5 = np.nansum(np.asarray(diff2y_pacf_list)[:5] ** 2)
        # Patial Autocorrelation coefficient at the first seasonal lag.
        if extra_args.get("seas_pacf1", default_status):
            seas_pacf1 = y_pacf_list[-1]
        return (
            y_pacf5,
            diff1y_pacf5,
            diff2y_pacf5,
            seas_pacf1,
        ) 
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_acfpacf_features(
        x: np.ndarray,
        acfpacf_lag: int = 6,
        period: int = 7,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Calculate ACF and PACF based features. Calculate seasonal ACF, PACF based features
        Reference: https://stackoverflow.com/questions/36038927/whats-the-difference-between-pandas-acf-and-statsmodel-acf
        R code: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
        Paper: Meta-learning how to forecast time series
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            acfpacf_lag: int; Largest lag number for returning ACF/PACF features via statsmodels.
                Default value is 6.
            period: int; Seasonal period. Default value is 7.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Aggregated ACF, PACF features.
        """
        acfpacf_features = {
            "y_acf1": np.nan,
            "y_acf5": np.nan,
            "diff1y_acf1": np.nan,
            "diff1y_acf5": np.nan,
            "diff2y_acf1": np.nan,
            "diff2y_acf5": np.nan,
            "y_pacf5": np.nan,
            "diff1y_pacf5": np.nan,
            "diff2y_pacf5": np.nan,
            "seas_acf1": np.nan,
            "seas_pacf1": np.nan,
        }
        if len(x) < 10 or len(x) < period or len(np.unique(x)) == 1:
            msg = "Length is shorter than period, or constant time series! Unable to calculate acf/pacf features!"
            logging.error(msg)
            return acfpacf_features
        nlag = min(acfpacf_lag, len(x) - 2)
        diff1x = [x[i] - x[i - 1] for i in range(1, len(x))]
        diff2x = [diff1x[i] - diff1x[i - 1] for i in range(1, len(diff1x))]
        y_acf_list = acf(x, unbiased=False, fft=True, nlags=period)[1:]
        diff1y_acf_list = acf(diff1x, unbiased=False, fft=True, nlags=nlag)[1:]
        diff2y_acf_list = acf(diff2x, unbiased=False, fft=True, nlags=nlag)[1:]
        y_pacf_list = pacf(x, nlags=period)[1:]
        diff1y_pacf_list = pacf(diff1x, nlags=nlag)[1:]
        diff2y_pacf_list = pacf(diff2x, nlags=nlag)[1:]
        # getting ACF features
        (
            acfpacf_features["y_acf1"],
            acfpacf_features["y_acf5"],
            acfpacf_features["diff1y_acf1"],
            acfpacf_features["diff1y_acf5"],
            acfpacf_features["diff2y_acf1"],
            acfpacf_features["diff2y_acf5"],
            acfpacf_features["seas_acf1"],
        ) = TsFeatures.get_acf_features(
            extra_args,
            default_status,
            y_acf_list,
            diff1y_acf_list,
            diff2y_acf_list,
        )
        # getting PACF features
        (
            acfpacf_features["y_pacf5"],
            acfpacf_features["diff1y_pacf5"],
            acfpacf_features["diff2y_pacf5"],
            acfpacf_features["seas_pacf1"],
        ) = TsFeatures.get_pacf_features(
            extra_args,
            default_status,
            y_pacf_list,
            diff1y_pacf_list,
            diff2y_pacf_list,
        )
        return acfpacf_features 
    # standard deviation of the first derivative
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_std1st_der(x: np.ndarray):
        """
        Calculating std1st_der: the standard deviation of the first derivative of the time series.
        Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            The standard deviation of the first derivative of the time series.
        """
        std1st_der = np.std(np.gradient(x))
        return std1st_der 
    # crossing points
[docs]    @staticmethod
    @jit(nopython=True)
    def get_crossing_points(x: np.ndarray):
        """
        Calculating crossing points: the number of times a time series crosses the median line.
        Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            The number of times a time series crosses the median line.
        """
        median = np.median(x)
        cp = 0
        for i in range(len(x) - 1):
            if x[i] <= median < x[i + 1] or x[i] >= median > x[i + 1]:
                cp += 1
        return cp 
    # binarize mean
[docs]    @staticmethod
    @jit(nopython=True)
    def get_binarize_mean(x: np.ndarray):
        """
        Converts time series array into a binarized version.
        Time-series values above its mean are given 1, and those below the mean are 0.
        Return the average value of the binarized vector.
        Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            The binarized version of time series array.
        """
        return np.mean(np.asarray(x) > np.mean(x)) 
    # KPSS unit root test
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_unitroot_kpss(x: np.ndarray):
        """
        Getting a test statistic based on KPSS test.
        Test a null hypothesis that an observable time series is stationary around a deterministic trend.
        A vector comprising the statistic for the KPSS unit root test with linear trend and lag one
        Wiki: https://en.wikipedia.org/wiki/KPSS_test
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            Test statistics acquired using KPSS test.
        """
        return kpss(x, regression="ct", nlags=1)[0] 
    # heterogeneity
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_het_arch(x: np.ndarray):
        """
        reference: https://www.statsmodels.org/dev/generated/statsmodels.stats.diagnostic.het_arch.html
        Engle’s Test for Autoregressive Conditional Heteroscedasticity (ARCH)
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            Lagrange multiplier test statistic
        """
        return het_arch(x, nlags=min(10, len(x) // 5))[0] 
    # histogram mode
[docs]    @staticmethod
    @jit(nopython=True)
    def get_histogram_mode(x: np.ndarray, nbins: int = 10):
        """
        Measures the mode of the data vector using histograms with a given number of bins.
        Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            nbins: int; Number of bins to get the histograms. Default value is 10.
        Returns:
            Mode of the data vector using histograms.
        """
        cnt, val = np.histogram(x, bins=nbins)
        return val[cnt.argmax()] 
    # First min/zero AC (2)
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_special_ac(
        x: np.ndarray, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
    ):
        """
        Gettting special_ac features.
        firstmin_ac: the time of first minimum in the autocorrelation function
        firstzero_ac: the time of first zero crossing the autocorrelation function.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Special autocorrelation features described above.
        """
        # First min AC
        special_ac_features = {"firstmin_ac": np.nan, "firstzero_ac": np.nan}
        AC = acf(x, unbiased=False, fft=True, nlags=len(x))[1:]
        if extra_args is not None and extra_args.get("firstmin_ac", default_status):
            i = 0
            while i < len(AC) - 1:
                if AC[i] > AC[i + 1]:
                    i += 1
                else:
                    break
            special_ac_features["firstmin_ac"] = i + 1
        # First zero AC
        if extra_args is not None and extra_args.get("firstzero_ac", default_status):
            j = 0
            while j < len(AC) - 1:
                if AC[j] > 0 > AC[j + 1]:
                    break
                else:
                    j += 1
            special_ac_features["firstzero_ac"] = j + 2
        return special_ac_features 
    # Linearity
[docs]    @staticmethod
    @jit(forceobj=True)
    def get_linearity(x: np.ndarray):
        """
        Getting linearity feature: R square from a fitted linear regression.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
        Returns:
            R square from a fitted linear regression.
        """
        _, _, r_value, _, _ = stats.linregress(np.arange(len(x)), x)
        return r_value ** 2 
    # Holt Parameters (2)
[docs]    @staticmethod
    def get_holt_params(
        x: np.ndarray, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
    ):
        """
        Estimates the smoothing parameter for the level-alpha and the smoothing parameter
        for the trend-beta of Holt’s linear trend method.
        'alpha': Level parameter of the Holt model.
        'beta': Trend parameter of the Hold model.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Level and trend parameter of a fitted Holt model.
        """
        holt_params_features = {"holt_alpha": np.nan, "holt_beta": np.nan}
        try:
            m = ExponentialSmoothing(x, trend="add", seasonal=None).fit()
            if extra_args is not None and extra_args.get("holt_alpha", default_status):
                holt_params_features["holt_alpha"] = m.params["smoothing_level"]
            if extra_args is not None and extra_args.get("holt_beta", default_status):
                statsmodels_ver = float(re.findall('([0-9]+\\.[0-9]+)\\..*', statsmodels.__version__)[0])
                if statsmodels_ver < 0.12:
                    holt_params_features["holt_beta"] = m.params["smoothing_slope"]
                elif statsmodels_ver >= 0.12:
                    holt_params_features["holt_beta"] = m.params["smoothing_trend"]
            return holt_params_features
        except Exception:
            return holt_params_features 
    # Holt Winter’s Parameters (3)
[docs]    @staticmethod
    def get_hw_params(
        x: np.ndarray,
        period: int = 7,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Estimates the smoothing parameter for the level-alpha, trend-beta of HW’s linear trend,
        and additive seasonal trend-gamma.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            period: int; Seaonal period for fitting exponential smoothing model. Default
                value is 7.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Level, trend and seasonal parameter of a fitted Holt-Winter's model.
        """
        hw_params_features = {"hw_alpha": np.nan, "hw_beta": np.nan, "hw_gamma": np.nan}
        try:
            # addressing issue of use_boxcox arg in different versions of statsmodels
            statsmodels_ver = float(re.findall('([0-9]+\\.[0-9]+)\\..*', statsmodels.__version__)[0])
            _args_ = {
                "seasonal_periods": period,
                "trend": "add",
                "seasonal": "add",
            }
            # performing version check on statsmodels
            if statsmodels_ver >= 0.12:
                _args_["use_boxcox"] = True
                _args_["initialization_method"] = 'estimated'
                m = ExponentialSmoothing(x, **_args_).fit()
            elif statsmodels_ver < 0.12:
                m = ExponentialSmoothing(x, **_args_).fit(use_boxcox = True)
            if extra_args is not None and extra_args.get("hw_alpha", default_status):
                hw_params_features["hw_alpha"] = m.params["smoothing_level"]
            if extra_args is not None and extra_args.get("hw_beta", default_status):
                if statsmodels_ver < 0.12:
                    hw_params_features["hw_beta"] = m.params["smoothing_slope"]
                elif statsmodels_ver >= 0.12:
                    hw_params_features["hw_beta"] = m.params["smoothing_trend"]
            if extra_args is not None and extra_args.get("hw_gamma", default_status):
                hw_params_features["hw_gamma"] = m.params["smoothing_seasonal"]
            return hw_params_features
        except Exception:
            return hw_params_features 
    # CUSUM Detection Outputs (8)
[docs]    @staticmethod
    def get_cusum_detector(
        ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
    ):
        """
        Run the Kats CUSUM Detector on the Time Series, extract features from the outputs of the detection.
        Args:
            ts: The univariate time series array in the form of Kats TimeSeriesData object.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Outputs of the CUSUM Detector, which include (1) Number of changepoints, either 1
            or 0, (2) Confidence of the changepoint detected, 0 if not changepoint, (3) index,
            or position of the changepoint detected within the time series, (4) delta of the
            mean levels before and after the changepoint, (5) log-likelihood ratio of changepoint,
            (6) Boolean - whether regression is detected by CUSUM, (7) Boolean - whether
            changepoint is stable, (8) p-value of changepoint.
        """
        cusum_detector_features = {
            "cusum_num": np.nan,
            "cusum_conf": np.nan,
            "cusum_cp_index": np.nan,
            "cusum_delta": np.nan,
            "cusum_llr": np.nan,
            "cusum_regression_detected": np.nan,
            "cusum_stable_changepoint": np.nan,
            "cusum_p_value": np.nan,
        }
        try:
            cusum = cusum_detection.CUSUMDetector(ts)
            cusum_cp = cusum.detector()
            if extra_args is not None and extra_args.get("cusum_num", default_status):
                cusum_detector_features["cusum_num"] = len(cusum_cp)
            if extra_args is not None and extra_args.get("cusum_conf", default_status):
                cusum_detector_features["cusum_conf"] = (
                    0 if len(cusum_cp) == 0 else cusum_cp[0][0].confidence
                )
            if extra_args is not None and extra_args.get("cusum_cp_index", default_status):
                cusum_detector_features["cusum_cp_index"] = (
                    0 if len(cusum_cp) == 0 else cusum_cp[0][1]._cp_index / len(ts)
                )
            if extra_args is not None and extra_args.get("cusum_delta", default_status):
                cusum_detector_features["cusum_delta"] = (
                    0 if len(cusum_cp) == 0 else cusum_cp[0][1]._delta
                )
            if extra_args is not None and extra_args.get("cusum_llr", default_status):
                cusum_detector_features["cusum_llr"] = (
                    0 if len(cusum_cp) == 0 else cusum_cp[0][1]._llr
                )
            if extra_args is not None and extra_args.get("cusum_regression_detected", default_status):
                cusum_detector_features["cusum_regression_detected"] = (
                    False if len(cusum_cp) == 0 else cusum_cp[0][1]._regression_detected
                )
            if extra_args is not None and extra_args.get("cusum_stable_changepoint", default_status):
                cusum_detector_features["cusum_stable_changepoint"] = (
                    False if len(cusum_cp) == 0 else cusum_cp[0][1]._stable_changepoint
                )
            if extra_args is not None and extra_args.get("cusum_p_value", default_status):
                cusum_detector_features["cusum_p_value"] = (
                    0 if len(cusum_cp) == 0 else cusum_cp[0][1]._p_value
                )
            return cusum_detector_features
        except Exception:
            return cusum_detector_features 
    # Robust Stat Detection Outputs (2)
[docs]    @staticmethod
    def get_robust_stat_detector(
        ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
    ):
        """
        Run the Kats Robust Stat Detector on the Time Series, extract features from the outputs of the detection.
        Args:
            ts: The univariate time series array in the form of Kats TimeSeriesData object.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            (1) Number changepoints detected by the Robust Stat Detector, and (2) Mean of
            the Metric values from the Robust Stat Detector.
        """
        robust_stat_detector_features = {
            "robust_num": np.nan,
            "robust_metric_mean": np.nan,
        }
        try:
            robust = robust_stat_detection.RobustStatDetector(ts)
            robust_cp = robust.detector()
            if extra_args is not None and extra_args.get("robust_num", default_status):
                robust_stat_detector_features["robust_num"] = len(robust_cp)
            if extra_args is not None and extra_args.get("robust_metric_mean", default_status):
                robust_stat_detector_features["robust_metric_mean"] = (
                    0
                    if len(robust_cp) == 0
                    else np.sum([cp[1]._metric for cp in robust_cp]) / len(robust_cp)
                )
            return robust_stat_detector_features
        except Exception:
            return robust_stat_detector_features 
    # BOCP Detection Outputs (3)
[docs]    @staticmethod
    def get_bocp_detector(
        ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
    ):
        """
        Run the Kats BOCP Detector on the Time Series, extract features from the outputs of the detection.
        Args:
            ts: The univariate time series array in the form of Kats TimeSeriesData object.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            (tuple): tuple containing:
                Number of changepoints detected by BOCP Detector
                Max value of the confidence of the changepoints detected
                Mean value of the confidence of the changepoints detected.
        """
        bocp_detector_features = {
            "bocp_num": np.nan,
            "bocp_conf_max": np.nan,
            "bocp_conf_mean": np.nan,
        }
        try:
            bocp = bocpd.BOCPDetector(ts)
            bocp_cp = bocp.detector(choose_priors=False)
            if extra_args is not None and extra_args.get("bocp_num", default_status):
                bocp_detector_features["bocp_num"] = len(bocp_cp)
            if extra_args is not None and extra_args.get("bocp_conf_max", default_status):
                bocp_detector_features["bocp_conf_max"] = (
                    0
                    if len(bocp_cp) == 0
                    else np.max([cp[0].confidence for cp in bocp_cp])
                )
            if extra_args is not None and extra_args.get("bocp_conf_mean", default_status):
                bocp_detector_features["bocp_conf_mean"] = (
                    0
                    if len(bocp_cp) == 0
                    else np.sum([cp[0].confidence for cp in bocp_cp]) / len(bocp_cp)
                )
            return bocp_detector_features
        except Exception:
            return bocp_detector_features 
    # Outlier Detection Outputs (1)
[docs]    @staticmethod
    def get_outlier_detector(
        ts: TimeSeriesData,
        decomp: str = "additive",
        iqr_mult: float = 3.0,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Run the Kats Outlier Detector on the Time Series, extract features from the outputs of the detection.
        Args:
            ts: The univariate time series array in the form of Kats TimeSeriesData object.
            decomp: str; Additive or Multiplicative mode for performing outlier detection using
                OutlierDetector. Default value is 'additive'.
            iqr_mult: float; IQR range for determining outliers through
                OutlierDetector. Default value is 3.0.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Number of outliers by the Outlier Detector.
        """
        outlier_detector_features = {"outlier_num": np.nan}
        try:
            odetector = outlier.OutlierDetector(ts, decomp=decomp, iqr_mult=iqr_mult)
            odetector.detector()
            if extra_args is not None and extra_args.get("outlier_num", default_status):
                # pyre-fixme[16]: `OutlierDetector` has no attribute `outliers`.
                outlier_detector_features["outlier_num"] = len(odetector.outliers[0])
            return outlier_detector_features
        except Exception:
            return outlier_detector_features 
    # Trend Detection Outputs (3)
[docs]    @staticmethod
    def get_trend_detector(
        ts: TimeSeriesData,
        threshold: float = 0.8,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Run the Kats Trend Detector on the Time Series, extract features from the outputs of the detection.
        Args:
            ts: The univariate time series array in the form of Kats TimeSeriesData object.
            threshold: float; threshold for trend intensity; higher threshold gives trend
                with high intensity (0.8 by default).  If we only want to use the p-value
                to determine changepoints, set threshold = 0.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            (1) Number of trends detected by the Kats Trend Detector, (2) Number of increasing
            trends, (3) Mean of the abolute values of Taus of the trends detected.
        """
        trend_detector_features = {
            "trend_num": np.nan,
            "trend_num_increasing": np.nan,
            "trend_avg_abs_tau": np.nan,
        }
        try:
            tdetector = trend_mk.MKDetector(data=ts, threshold=threshold)
            tdetected_time_points = tdetector.detector(direction="both")
            if extra_args is not None and extra_args.get("trend_num", default_status):
                trend_detector_features["trend_num"] = len(tdetected_time_points)
            if extra_args is not None and extra_args.get("trend_num_increasing", default_status):
                trend_detector_features["trend_num_increasing"] = len(
                    [
                        p
                        for p in tdetected_time_points
                        if p[1].trend_direction == "decreasing"
                    ]
                )
            if extra_args is not None and extra_args.get("trend_avg_abs_tau", default_status):
                trend_detector_features["trend_avg_abs_tau"] = (
                    0
                    if len(tdetected_time_points) == 0
                    else np.sum([abs(p[1].Tau) for p in tdetected_time_points])
                    / len(tdetected_time_points)
                )
            return trend_detector_features
        except Exception:
            return trend_detector_features 
    @staticmethod
    @jit(nopython=True)
    def _ewma(
        arr: np.ndarray,
        span: int,
        min_periods: int
    ):
        """
        Exponentialy weighted moving average specified by a decay ``window``
        to provide better adjustments for small windows via:
            y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
                   (1 + (1-a) + (1-a)^2 + ... + (1-a)^n).
        Args:
        arr : np.ndarray; A single dimenisional numpy array.
        span : int; The decay window, or 'span'.
        min_periods: int; Minimum amount of data points we'd like to include in the output.
        Returns:
            A np.ndarray. The exponentially weighted moving average of the array.
        """
        output_array = np.empty(arr.shape[0], dtype=np.float64)
        output_array[:] = np.NaN
        arr = arr[~np.isnan(arr)]
        n = arr.shape[0]
        ewma = np.empty(n, dtype=np.float64)
        alpha = 2 / float(span + 1)
        w = 1
        ewma_old = arr[0]
        ewma[0] = ewma_old
        for i in range(1, n):
            w += (1-alpha)**i
            ewma_old = ewma_old*(1-alpha) + arr[i]
            ewma[i] = ewma_old / w
        output_subset = ewma[(min_periods-1):]
        output_array[-len(output_subset):] = output_subset
        return output_array
    @staticmethod
    @jit(forceobj=True)
    def _get_nowcasting_np(
        x: np.ndarray,
        window: int = 5,
        n_fast: int = 12,
        n_slow: int = 21,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Internal function for actually performing feature engineering using the same procedure as
        nowcasting feature_extraction under kats/models.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            window: int; Length of window size for all Nowcasting features. Default value is 5.
            n_fast: int; length of "fast" or short period exponential moving average in the MACD
                algorithm in the nowcasting features. Default value is 12.
            n_slow: int; length of "slow" or long period exponential moving average in the MACD
                algorithm in the nowcasting features. Default value is 21.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            A list containing extracted nowcast features.
        """
        # initializing the outputs
        nowcasting_features = [np.nan for _ in range(7)]
        # ROC: indicating return comparing to step n back.
        if extra_args is not None and extra_args.get("nowcast_roc", default_status):
            M = x[(window-1):] - x[:-(window-1)]
            N = x[:-(window-1)]
            arr = M / N
            nowcasting_features[0] = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0).mean()
        # MOM: indicating momentum: difference of current value and n steps back.
        if extra_args is not None and extra_args.get("nowcast_mom", default_status):
            M = x[window:] - x[:-window]
            nowcasting_features[1] = np.nan_to_num(M, nan=0.0, posinf = 0.0, neginf=0.0).mean()
        # MA: indicating moving average in the past n steps.
        if extra_args is not None and extra_args.get("nowcast_ma", default_status):
            ret = np.cumsum(x, dtype=float)
            ret[window:] = ret[window:] - ret[:-window]
            ma = ret[window - 1:] / window
            nowcasting_features[2] = np.nan_to_num(ma, nan=0.0, posinf=0.0, neginf=0.0).mean()
        # LAG: indicating lagged value at the past n steps.
        if extra_args is not None and extra_args.get("nowcast_lag", default_status):
            N = x[:-window]
            nowcasting_features[3] = np.nan_to_num(N, nan=0.0, posinf=0.0, neginf=0.0).mean()
        # MACD: https://www.investopedia.com/terms/m/macd.asp.
        ema_fast = TsFeatures._ewma(x, n_fast, n_slow-1)
        ema_slow = TsFeatures._ewma(x, n_slow, n_slow-1)
        MACD = ema_fast - ema_slow
        if extra_args is not None and extra_args.get("nowcast_macd", default_status):
            nowcasting_features[4] = np.nan_to_num(np.nanmean(MACD), nan=0.0, posinf=0.0, neginf=0.0)
        MACDsign = TsFeatures._ewma(MACD, 9, 8)
        if extra_args is not None and extra_args.get("nowcast_macdsign", default_status):
            nowcasting_features[5] = np.nan_to_num(np.nanmean(MACDsign), nan=0.0, posinf=0.0, neginf=0.0)
        MACDdiff = MACD - MACDsign
        if extra_args is not None and extra_args.get("nowcast_macddiff", default_status):
            nowcasting_features[6] = np.nan_to_num(np.nanmean(MACDdiff), nan=0.0, posinf=0.0, neginf=0.0)
        return nowcasting_features
    # Nowcasting features (7)
[docs]    @staticmethod
    def get_nowcasting(
        x: np.ndarray,
        window: int = 5,
        n_fast: int = 12,
        n_slow: int = 21,
        extra_args: Optional[Dict[str, bool]] = None,
        default_status: bool = True,
    ):
        """
        Run the Kats Nowcasting transformer on the Time Series, extract aggregated features from the outputs of the transformation.
        Args:
            x: The univariate time series array in the form of 1d numpy array.
            window: int; Length of window size for all Nowcasting features. Default value is 5.
            n_fast: int; length of "fast" or short period exponential moving average in the MACD
                algorithm in the nowcasting features. Default value is 12.
            n_slow: int; length of "slow" or long period exponential moving average in the MACD
                algorithm in the nowcasting features. Default value is 21.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Mean values of the Kats Nowcasting algorithm time series outputs using the parameters
            (window, n_fast, n_slow) indicated above. These outputs inclue : (1) Mean of Rate of
            Change (ROC) time series, (2) Mean of Moving Average (MA) time series,(3) Mean of
            Momentum (MOM) time series, (4) Mean of LAG time series, (5) Means of MACD, MACDsign,
            and MACDdiff from Kats Nowcasting.
        """
        nowcasting_features = {}
        features = [
            "nowcast_roc",
            "nowcast_mom",
            "nowcast_ma",
            "nowcast_lag",
            "nowcast_macd",
            "nowcast_macdsign",
            "nowcast_macddiff",
        ]
        for feature in features:
            if extra_args is not None and extra_args.get(feature, default_status):
                nowcasting_features[feature] = np.nan
        try:
            _features = TsFeatures._get_nowcasting_np(x, window, n_fast, n_slow, extra_args, default_status)
            for idx, feature in enumerate(features):
                if extra_args is not None and extra_args.get(feature, default_status):
                    nowcasting_features[feature] = _features[idx]
            return nowcasting_features
        except Exception:
            return nowcasting_features 
    # seasonality features (4)
[docs]    @staticmethod
    def get_seasonalities(
        ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
    ):
        """
        Run the Kats seaonality detectors to get the estimated seasonal period, then extract trend,
        seasonality and residual magnitudes.
        Args:
            ts: The univariate time series array in the form of Kats TimeSeriesData object.
            extra_args: A dictionary containing information for disabling calculation
                of a certain feature. Default value is None, i.e. no feature is disabled.
            default_status: Default status of the switch for calculate the features or not.
                Default value is True.
        Returns:
            Returns the detected seasonality period.
            Slope acquired via fitting simple linear regression model on the trend component
            as trend magnitude.
            Difference between the 95 percentile and 5 percentile of the seasonal component
            as the seasonality magnitude.
            Standard deviation of the residual component.
        """
        seasonality_features = {
            "seasonal_period": np.nan,
            "trend_mag": np.nan,
            "seasonality_mag": np.nan,
            "residual_std": np.nan,
        }
        try:
            # detrending for period estimation
            detrended = TimeSeriesData(
                pd.DataFrame(
                    {
                        "time": len(ts.value.values) - 1,
                        "value": ts.value.values[1:] - ts.value.values[:-1],
                    }
                )
            )
            detected = seasonality.FFTDetector(detrended).detector()
            if detected["seasonality_presence"]:
                _period = int(np.min(detected["seasonalities"]))
            elif not detected["seasonality_presence"]:
                _period = 7
            res = STL(ts.value.values, period=_period).fit()
            if extra_args is not None and extra_args.get("seasonal_period", default_status):
                seasonality_features["seasonal_period"] = _period
            # getting seasonality magnitude
            if extra_args is not None and extra_args.get("seasonality_mag", default_status):
                seasonality_features["seasonality_mag"] = np.round(
                    np.quantile(res.seasonal, 0.95) - np.quantile(res.seasonal, 0.05)
                )
            # fitting linear regression for trend magnitude
            if extra_args is not None and extra_args.get("trend_mag", default_status):
                exog = res.trend
                _series = exog - exog[0]
                mod = sm.OLS(_series, np.arange(len(_series)))
                _res = mod.fit()
                # trend magnitude
                seasonality_features["trend_mag"] = _res.params[0]
            # residual standard deviation
            if extra_args is not None and extra_args.get("residual_std", default_status):
                seasonality_features["residual_std"] = np.std(res.resid)
            return seasonality_features
        except Exception:
            return seasonality_features