#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import re
import logging
import statsmodels
from functools import partial
from itertools import groupby
from typing import List, Optional, Dict
import numpy as np
import pandas as pd
import statsmodels.api as sm
from kats.consts import TimeSeriesData
from kats.detectors import (
cusum_detection,
bocpd,
robust_stat_detection,
outlier,
trend_mk,
seasonality,
)
from numba import jit # @manual
from scipy import stats
from scipy.signal import periodogram # @manual
from statsmodels.stats.diagnostic import het_arch
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.seasonal import STL
from statsmodels.tsa.stattools import acf, pacf, kpss
"""TsFeatures is a module for performing adhoc feature engineering on time series
data using different statistics.
The module process time series data into features for machine learning models.
We include seasonality, autocorrelation, modeling parameter, changepoints,
moving statistics, and raw statistics of time series array as the adhoc features.
We also offer to compute part of the features or group of features using
selected_features argument, you could also disable feature or group of
features by setting feature_name/feature_group_name = False. You can find
all feature group names in feature_group_mapping attribute.
"""
[docs]class TsFeatures:
"""Process time series data into features for machine learning models, with the
function to opt-in/out feature and feature groups in the calculations.
Attributes:
window_size: int; Length of the sliding window for getting level shift features,
lumpiness, and stability of time series.
spectral_freq: int; Frequency parameter in getting periodogram through scipy for
calculating Shannon entropy.
stl_period: int; Period parameter for performing seasonality trend decomposition
using LOESS with statsmodels.
nbins: int; Number of bins to equally segment time series array for getting flat
spot feature.
lag_size: int; Maximum number of lag values for calculating Hurst Exponent.
acfpacf_lag: int; Largest lag number for returning ACF/PACF features via statsmodels.
decomp: str; Additive or Multiplicative mode for performing outlier detection using
Kats.Detectors.outlier.OutlierDetector.
iqr_mult: float; IQR range for determining outliers through
Kats.Detectors.outlier.OutlierDetector.
threshold: float; threshold for trend intensity; higher threshold gives
trend with high intensity (0.8 by default). If we only want to use the
p-value to determine changepoints, set threshold = 0.
window: int; length of window for all nowcasting features.
n_fast: int; length of "fast" or short period exponential moving average in the MACD
algorithm in the nowcasting features.
n_slow: int; length of "slow" or long period exponential moving average in the MACD
algorithm in the nowcasting features.
selected_features: None or List[str]; list of feature/feature group name(s)
selected to be calculated. We will try only calculating selected
features, since some features are bundled in the calculations. This process
helps with boosting efficiency, and we will only output selected features.
feature_group_mapping: The dictionary with the mapping from individual features
to their bundled feature groups.
final_filter: A dicitonary with boolean as the values to filter out the features
not selected, yet calculated due to underlying bundles.
stl_features: Switch for calculating/outputting stl features.
level_shift_features: Switch for calculating/outputting level shift features.
acfpacf_features: Switch for calculating/outputting ACF/PACF features.
special_ac: Switch for calculating/outputting features.
holt_params: Switch for calculating/outputting holt parameter features.
hw_params: Switch for calculating/outputting holt-winters parameter features.
statistics: Switch for calculating/outputting raw statistics features.
cusum_detector: Switch for calculating/outputting features using cusum detector
in Kats.
robust_stat_detector: Switch for calculating/outputting features using robust stat detector
in Kats.
bocp_detector: Switch for calculating/outputting stl features features using bocp detector
in Kats.
outlier_detector: Switch for calculating/outputting stl features using outlier detector
in Kats.
trend_detector: Switch for calculating/outputting stl features using trend detector
in Kats.
nowcasting: Switch for calculating/outputting stl features using nowcasting detector
in Kats.
seasonalities: Switch for calculating/outputting stl features using cusum detector
in Kats.
default: The default status of the switch for opt-in/out feature calculations.
"""
def __init__(
self,
window_size: int = 20,
spectral_freq: int = 1,
stl_period: int = 7,
nbins: int = 10,
lag_size: int = 30,
acfpacf_lag: int = 6,
decomp: str = "additive",
iqr_mult: float = 3.0,
threshold: float = 0.8,
window: int = 5,
n_fast: int = 12,
n_slow: int = 21,
selected_features: Optional[List[str]] = None,
**kwargs,
):
# init hyper-parameters
self.window_size = window_size
self.spectral_freq = spectral_freq
self.stl_period = stl_period
self.nbins = nbins
self.lag_size = lag_size
self.acfpacf_lag = acfpacf_lag
self.decomp = decomp
self.iqr_mult = iqr_mult
self.threshold = threshold
self.window = window
self.n_fast = n_fast
self.n_slow = n_slow
# Mapping group features
g2f = {
"stl_features": [
"trend_strength",
"seasonality_strength",
"spikiness",
"peak",
"trough",
],
"level_shift_features": [
"level_shift_idx",
"level_shift_size",
],
"acfpacf_features": [
"y_acf1",
"y_acf5",
"diff1y_acf1",
"diff1y_acf5",
"diff2y_acf1",
"diff2y_acf5",
"y_pacf5",
"diff1y_pacf5",
"diff2y_pacf5",
"seas_acf1",
"seas_pacf1",
],
"special_ac": [
"firstmin_ac",
"firstzero_ac",
],
"holt_params": [
"holt_alpha",
"holt_beta",
],
"hw_params": [
"hw_alpha",
"hw_beta",
"hw_gamma",
],
"statistics": [
"length",
"mean",
"var",
"entropy",
"lumpiness",
"stability",
"flat_spots",
"hurst",
"std1st_der",
"crossing_points",
"binarize_mean",
"unitroot_kpss",
"heterogeneity",
"histogram_mode",
"linearity",
],
"cusum_detector": [
"cusum_num",
"cusum_conf",
"cusum_cp_index",
"cusum_delta",
"cusum_llr",
"cusum_regression_detected",
"cusum_stable_changepoint",
"cusum_p_value",
],
"robust_stat_detector": [
"robust_num",
"robust_metric_mean",
],
"bocp_detector": [
"bocp_num",
"bocp_conf_max",
"bocp_conf_mean",
],
"outlier_detector": [
"outlier_num",
],
"trend_detector": [
"trend_num",
"trend_num_increasing",
"trend_avg_abs_tau",
],
"nowcasting": [
"nowcast_roc",
"nowcast_ma",
"nowcast_mom",
"nowcast_lag",
"nowcast_macd",
"nowcast_macdsign",
"nowcast_macddiff",
],
"seasonalities": [
"seasonal_period",
"trend_mag",
"seasonality_mag",
"residual_std",
],
}
self.feature_group_mapping = g2f
f2g = {}
for k, v in g2f.items():
for f in v:
f2g[f] = k
self._total_feature_len_ = len(f2g.keys())
for f in kwargs.keys():
assert (
f in f2g.keys() or f in g2f.keys()
), f"""couldn't find your desired feature/group "{f}", please check spelling"""
# Higher level of features:
# Once disabled, won't even go inside these groups of features
# for calculation
if not selected_features:
default = True
self.final_filter = {k: default for k in f2g.keys()}
elif selected_features:
default = False
self.final_filter = {k: default for k in f2g.keys()}
for f in selected_features:
assert (
f in f2g.keys() or f in g2f.keys()
), f"""couldn't find your desired feature/group "{f}", please check spelling"""
if f in g2f.keys(): # the opt-in request is for a feature group
kwargs[f] = True
for feature in g2f[f]:
kwargs[feature] = kwargs.get(feature, True)
self.final_filter[feature] = True
elif f in f2g.keys(): # the opt-in request is for a certain feature
assert kwargs.get(
f2g[f], True
), f"""feature group: {f2g[f]} has to be opt-in based on your opt-in request of feature: {f}"""
assert kwargs.get(
f, True
), f"""you have requested to both opt-in and opt-out feature: {f}"""
kwargs[f2g[f]] = True # need to opt-in the feature group first
kwargs[f] = True # opt-in the feature
self.final_filter[f] = True
for k, v in kwargs.items():
self.final_filter[
k
] = v # final filter for filtering out features user didn't request and keep only the requested ones
# setting default value for the switches of calculating the group of features or not
self.stl_features = kwargs.get("stl_features", default)
self.level_shift_features = kwargs.get("level_shift_features", default)
self.acfpacf_features = kwargs.get("acfpacf_features", default)
self.special_ac = kwargs.get("special_ac", default)
self.holt_params = kwargs.get("holt_params", default)
self.hw_params = kwargs.get("hw_params", default)
self.statistics = kwargs.get("statistics", default)
self.cusum_detector = kwargs.get("cusum_detector", False)
self.robust_stat_detector = kwargs.get("robust_stat_detector", False)
self.bocp_detector = kwargs.get("bocp_detector", False)
self.outlier_detector = kwargs.get("outlier_detector", False)
self.trend_detector = kwargs.get("trend_detector", False)
self.nowcasting = kwargs.get("nowcasting", False)
self.seasonalities = kwargs.get("seasonalities", False)
# For lower level of the features
self.__kwargs__ = kwargs
self.default = default
def _transform_1d(self, x: np.ndarray, ts: TimeSeriesData):
"""
Transform single (univariate) time series
Args:
x: The univariate time series array in the form of 1d numpy array.
ts: The univariate time series array in the form of Kats Timeseries
Data object.
Returns:
The dictionary with all the features aggregated from the outputs of
each feature group calculator.
"""
# calculate STL based features
dict_stl_features = {}
if self.stl_features:
dict_stl_features = self.get_stl_features(
x,
period=self.stl_period,
extra_args=self.__kwargs__,
default_status=self.default,
)
# calculate level shift based features
dict_level_shift_features = {}
if self.level_shift_features:
dict_level_shift_features = self.get_level_shift(
x,
window_size=self.window_size,
extra_args=self.__kwargs__,
default_status=self.default,
)
# calculate ACF, PACF based features
dict_acfpacf_features = {}
if self.acfpacf_features:
dict_acfpacf_features = self.get_acfpacf_features(
x,
acfpacf_lag=self.acfpacf_lag,
period=self.stl_period,
extra_args=self.__kwargs__,
default_status=self.default,
)
# calculate special AC
dict_specialac_features = {}
if self.special_ac:
dict_specialac_features = self.get_special_ac(
x, extra_args=self.__kwargs__, default_status=self.default
)
# calculate holt params
dict_holt_params_features = {}
if self.holt_params:
dict_holt_params_features = self.get_holt_params(
x, extra_args=self.__kwargs__, default_status=self.default
)
# calculate hw params
dict_hw_params_features = {}
if self.hw_params:
dict_hw_params_features = self.get_hw_params(
x,
period=self.stl_period,
extra_args=self.__kwargs__,
default_status=self.default,
)
# single features
_dict_features_ = {}
if self.statistics:
dict_features = {
"length": partial(self.get_length),
"mean": partial(self.get_mean),
"var": partial(self.get_var),
"entropy": partial(self.get_spectral_entropy, freq=self.spectral_freq),
"lumpiness": partial(self.get_lumpiness, window_size=self.window_size),
"stability": partial(self.get_stability, window_size=self.window_size),
"flat_spots": partial(self.get_flat_spots, nbins=self.nbins),
"hurst": partial(self.get_hurst, lag_size=self.lag_size),
"std1st_der": partial(self.get_std1st_der),
"crossing_points": partial(self.get_crossing_points),
"binarize_mean": partial(self.get_binarize_mean),
"unitroot_kpss": partial(self.get_unitroot_kpss),
"heterogeneity": partial(self.get_het_arch),
"histogram_mode": partial(self.get_histogram_mode, nbins=self.nbins),
"linearity": partial(self.get_linearity),
}
_dict_features_ = {}
for k, v in dict_features.items():
if self.__kwargs__.get(k, self.default):
_dict_features_[k] = v(x)
# calculate cusum detector features
dict_cusum_detector_features = {}
if self.cusum_detector:
dict_cusum_detector_features = self.get_cusum_detector(
ts, extra_args=self.__kwargs__, default_status=self.default
)
# calculate robust stat detector features
dict_robust_stat_detector_features = {}
if self.robust_stat_detector:
dict_robust_stat_detector_features = self.get_robust_stat_detector(
ts, extra_args=self.__kwargs__, default_status=self.default
)
# calculate bocp detector features
dict_bocp_detector_features = {}
if self.bocp_detector:
dict_bocp_detector_features = self.get_bocp_detector(
ts, extra_args=self.__kwargs__, default_status=self.default
)
# calculate outlier detector features
dict_outlier_detector_features = {}
if self.outlier_detector:
dict_outlier_detector_features = self.get_outlier_detector(
ts,
decomp=self.decomp,
iqr_mult=self.iqr_mult,
extra_args=self.__kwargs__,
default_status=self.default,
)
# calculate trend detector features
dict_trend_detector_features = {}
if self.trend_detector:
dict_trend_detector_features = self.get_trend_detector(
ts,
threshold=self.threshold,
extra_args=self.__kwargs__,
default_status=self.default,
)
# calculate nowcasting features
dict_nowcasting_features = {}
if self.nowcasting:
dict_nowcasting_features = self.get_nowcasting(
x,
window=self.window,
n_fast=self.n_fast,
n_slow=self.n_slow,
extra_args=self.__kwargs__,
default_status=self.default,
)
# calculate seasonality features
dict_seasonality_features = {}
if self.seasonalities:
dict_seasonality_features = self.get_seasonalities(
ts, extra_args=self.__kwargs__, default_status=self.default
)
return {
**_dict_features_,
**dict_stl_features,
**dict_level_shift_features,
**dict_acfpacf_features,
**dict_specialac_features,
**dict_holt_params_features,
**dict_hw_params_features,
**dict_cusum_detector_features,
**dict_robust_stat_detector_features,
**dict_bocp_detector_features,
**dict_outlier_detector_features,
**dict_trend_detector_features,
**dict_nowcasting_features,
**dict_seasonality_features,
}
# length
[docs] @staticmethod
@jit(nopython=True)
def get_length(x: np.ndarray):
"""
Getting the length of time series array.
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
Length of the time series array.
"""
return len(x)
# mean
[docs] @staticmethod
@jit(nopython=True)
def get_mean(x: np.ndarray):
"""
Getting the average value of time series array.
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
Average of the time series array.
"""
return np.mean(x)
# variance
[docs] @staticmethod
@jit(nopython=True)
def get_var(x: np.ndarray):
"""
Getting the variance of time series array.
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
Variance of the time series array.
"""
return np.var(x)
# spectral entropy
[docs] @staticmethod
@jit(forceobj=True)
def get_spectral_entropy(x: np.ndarray, freq: int = 1):
"""
Getting normalized Shannon entropy of power spectral density.
PSD is calculated using scipy's periodogram.
Args:
x: The univariate time series array in the form of 1d numpy array.
freq: int; Frequency for calculating the PSD via scipy periodogram.
Default value is 1.
Returns:
Normalized Shannon entropy.
"""
# calculate periodogram
_, psd = periodogram(x, freq)
# calculate shannon entropy of normalized psd
psd_norm = psd / np.sum(psd)
entropy = np.nansum(psd_norm * np.log2(psd_norm))
return -(entropy / np.log2(psd_norm.size))
# lumpiness
[docs] @staticmethod
@jit(forceobj=True)
def get_lumpiness(x: np.ndarray, window_size: int = 20):
"""
Calculating the lumpiness of time series.
Lumpiness is defined as the variance of the chunk-wise variances.
Args:
x: The univariate time series array in the form of 1d numpy array.
window_size: int; Window size to split the data into chunks for getting
variances. Default value is 20.
Returns:
Lumpiness of the time series array.
"""
v = [np.var(x_w) for x_w in np.array_split(x, len(x) // window_size + 1)]
return np.var(v)
# stability
[docs] @staticmethod
@jit(forceobj=True)
def get_stability(x: np.ndarray, window_size: int = 20):
"""
Calculating the stability of time series.
Stability is defined as the variance of chunk-wise means.
Args:
x: The univariate time series array in the form of 1d numpy array.
window_size: int; Window size to split the data into chunks for getting
variances. Default value is 20.
Returns:
Stability of the time series array.
"""
v = [np.mean(x_w) for x_w in np.array_split(x, len(x) // window_size + 1)]
return np.var(v)
# STL decomposition based features
[docs] @staticmethod
@jit(forceobj=True)
def get_stl_features(
x: np.ndarray,
period: int = 7,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Calculate STL based features for a time series, including strength of
trend, seasonality, spikiness, peak/trough.
Args:
x: The univariate time series array in the form of 1d numpy array.
period: int; Period parameter for performing seasonality trend decomposition
using LOESS with statsmodels. Default value is 7.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Seasonality features including strength of trend, seasonality, spikiness,
peak/trough.
"""
stl_features = {}
# STL decomposition
res = STL(x, period=period).fit()
# strength of trend
if extra_args is not None and extra_args.get("trend_strength", default_status):
stl_features["trend_strength"] = 1 - np.var(res.resid) / np.var(
res.trend + res.resid
)
# strength of seasonality
if extra_args is not None and extra_args.get("seasonality_strength", default_status):
stl_features["seasonality_strength"] = 1 - np.var(res.resid) / np.var(
res.seasonal + res.resid
)
# spikiness: variance of the leave-one-out variances of the remainder component
if extra_args is not None and extra_args.get("spikiness", default_status):
resid_array = np.repeat(
np.array(res.resid)[:, np.newaxis], len(res.resid), axis=1
)
resid_array[np.diag_indices(len(resid_array))] = np.NaN
stl_features["spikiness"] = np.var(np.nanvar(resid_array, axis=0))
# location of peak
if extra_args is not None and extra_args.get("peak", default_status):
stl_features["peak"] = np.argmax(res.seasonal[:period])
# location of trough
if extra_args is not None and extra_args.get("trough", default_status):
stl_features["trough"] = np.argmin(res.seasonal[:period])
return stl_features
# Level shift
[docs] @staticmethod
@jit(forceobj=True)
def get_level_shift(
x: np.ndarray,
window_size: int = 20,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
) -> pd.DataFrame:
"""
Calculating level shift features.
* level_shift_idx: Location of the maximum mean value difference,
between two consecutive sliding windows
* level_shift_size: Size of the maximum mean value difference,
between two consecutive sliding windows
Args:
x: The univariate time series array in the form of 1d numpy array.
window_size: int; Length of the sliding window. Default value is 20.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Level shift features including level_shift_idx, and level_shift_size
"""
level_shift_features = {"level_shift_idx": np.nan, "level_shift_size": np.nan}
if len(x) < window_size + 2:
msg = "Length of time series is shorter than window_size, unable to calculate level shift features!"
logging.error(msg)
return level_shift_features
sliding_idx = (np.arange(len(x))[None, :] + np.arange(window_size)[:, None])[
:, : len(x) - window_size + 1
]
means = np.mean(x[sliding_idx], axis=0)
mean_diff = np.abs(means[:-1] - means[1:])
if extra_args is not None and extra_args.get("level_shift_idx", default_status):
level_shift_features["level_shift_idx"] = np.argmax(mean_diff)
if extra_args is not None and extra_args.get("level_shift_size", default_status):
level_shift_features["level_shift_size"] = mean_diff[np.argmax(mean_diff)]
return level_shift_features
# Flat spots
[docs] @staticmethod
@jit(forceobj=True)
def get_flat_spots(x: np.ndarray, nbins: int = 10):
"""
Getting flat spots: Maximum run-lengths across equally-sized segments of time series
Args:
x: The univariate time series array in the form of 1d numpy array.
nbins: int; Number of bins to segment time series data into.
Returns:
Maximum run-lengths across segmented time series array.
"""
if len(x) <= nbins:
msg = "Length of time series is shorter than nbins, unable to calculate flat spots feature!"
logging.error(msg)
return np.nan
max_run_length = 0
window_size = int(len(x) / nbins)
for i in range(0, len(x), window_size):
run_length = np.max(
[len(list(v)) for k, v in groupby(x[i : i + window_size])]
)
if run_length > max_run_length:
max_run_length = run_length
return max_run_length
# Hurst Exponent
[docs] @staticmethod
@jit(forceobj=True)
def get_hurst(x: np.ndarray, lag_size: int = 30):
"""
Getting: Hurst Exponent wiki: https://en.wikipedia.org/wiki/Hurst_exponent
Args:
x: The univariate time series array in the form of 1d numpy array.
lag_size: int; Size for getting lagged time series data. Default value
is 30.
Returns:
The Hurst Exponent of the time series array
"""
# Create the range of lag values
lags = range(2, min(lag_size, len(x) - 1))
# Calculate the array of the variances of the lagged differences
tau = [np.std(np.asarray(x)[lag:] - np.asarray(x)[:-lag]) for lag in lags]
# Use a linear fit to estimate the Hurst Exponent
poly = np.polyfit(np.log(lags), np.log(tau), 1)
# Return the Hurst exponent from the polyfit output
return poly[0] if not np.isnan(poly[0]) else 0
# ACF and PACF features
# ACF features
[docs] @staticmethod
@jit(forceobj=True)
def get_acf_features(
extra_args: Dict[str, bool],
default_status: bool,
y_acf_list: List[float],
diff1y_acf_list: List[float],
diff2y_acf_list: List[float],
):
"""
Aggregating extracted ACF features from get_acfpacf_features function.
Args:
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
y_acf_list: List of ACF values acquired from original time series.
diff1y_acf_list: List of ACF values acquired from differenced time series.
diff2y_acf_list: List of ACF values acquired from twice differenced time series.
Returns:
Auto-correlation function (ACF) features.
"""
(
y_acf1,
y_acf5,
diff1y_acf1,
diff1y_acf5,
diff2y_acf1,
diff2y_acf5,
seas_acf1,
) = (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan)
# y_acf1: first ACF value of the original series
if extra_args.get("y_acf1", default_status):
y_acf1 = y_acf_list[0]
# y_acf5: sum of squares of first 5 ACF values of original series
if extra_args.get("y_acf5", default_status):
y_acf5 = np.sum(np.asarray(y_acf_list)[:5] ** 2)
# diff1y_acf1: first ACF value of the differenced series
if extra_args.get("diff1y_acf1", default_status):
diff1y_acf1 = diff1y_acf_list[0]
# diff1y_acf5: sum of squares of first 5 ACF values of differenced series
if extra_args.get("diff1y_acf5", default_status):
diff1y_acf5 = np.sum(np.asarray(diff1y_acf_list)[:5] ** 2)
# diff2y_acf1: first ACF value of the twice-differenced series
if extra_args.get("diff2y_acf1", default_status):
diff2y_acf1 = diff2y_acf_list[0]
# diff2y_acf5: sum of squares of first 5 ACF values of twice-differenced series
if extra_args.get("diff2y_acf5", default_status):
diff2y_acf5 = np.sum(np.asarray(diff2y_acf_list)[:5] ** 2)
# Autocorrelation coefficient at the first seasonal lag.
if extra_args.get("seas_acf1", default_status):
seas_acf1 = y_acf_list[-1]
return (
y_acf1,
y_acf5,
diff1y_acf1,
diff1y_acf5,
diff2y_acf1,
diff2y_acf5,
seas_acf1,
)
# PACF features
[docs] @staticmethod
@jit(forceobj=True)
def get_pacf_features(
extra_args: Dict[str, bool],
default_status: bool,
y_pacf_list: List[float],
diff1y_pacf_list: List[float],
diff2y_pacf_list: List[float],
):
"""
Aggregating extracted PACF features from get_acfpacf_features function.
Args:
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
y_pacf_list: List of PACF values acquired from original time series.
diff1y_pacf_list: List of PACF values acquired from differenced time series.
diff2y_pacf_list: List of PACF values acquired from twice differenced time series.
Returns:
Partial auto-correlation function (PACF) features.
"""
(
y_pacf5,
diff1y_pacf5,
diff2y_pacf5,
seas_pacf1,
) = (np.nan, np.nan, np.nan, np.nan)
# y_pacf5: sum of squares of first 5 PACF values of original series
if extra_args.get("y_pacf5", default_status):
y_pacf5 = np.nansum(np.asarray(y_pacf_list)[:5] ** 2)
# diff1y_pacf5: sum of squares of first 5 PACF values of differenced series
if extra_args.get("diff1y_pacf5", default_status):
diff1y_pacf5 = np.nansum(np.asarray(diff1y_pacf_list)[:5] ** 2)
# diff2y_pacf5: sum of squares of first 5 PACF values of twice-differenced series
if extra_args.get("diff2y_pacf5", default_status):
diff2y_pacf5 = np.nansum(np.asarray(diff2y_pacf_list)[:5] ** 2)
# Patial Autocorrelation coefficient at the first seasonal lag.
if extra_args.get("seas_pacf1", default_status):
seas_pacf1 = y_pacf_list[-1]
return (
y_pacf5,
diff1y_pacf5,
diff2y_pacf5,
seas_pacf1,
)
[docs] @staticmethod
@jit(forceobj=True)
def get_acfpacf_features(
x: np.ndarray,
acfpacf_lag: int = 6,
period: int = 7,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Calculate ACF and PACF based features. Calculate seasonal ACF, PACF based features
Reference: https://stackoverflow.com/questions/36038927/whats-the-difference-between-pandas-acf-and-statsmodel-acf
R code: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
Paper: Meta-learning how to forecast time series
Args:
x: The univariate time series array in the form of 1d numpy array.
acfpacf_lag: int; Largest lag number for returning ACF/PACF features via statsmodels.
Default value is 6.
period: int; Seasonal period. Default value is 7.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Aggregated ACF, PACF features.
"""
acfpacf_features = {
"y_acf1": np.nan,
"y_acf5": np.nan,
"diff1y_acf1": np.nan,
"diff1y_acf5": np.nan,
"diff2y_acf1": np.nan,
"diff2y_acf5": np.nan,
"y_pacf5": np.nan,
"diff1y_pacf5": np.nan,
"diff2y_pacf5": np.nan,
"seas_acf1": np.nan,
"seas_pacf1": np.nan,
}
if len(x) < 10 or len(x) < period or len(np.unique(x)) == 1:
msg = "Length is shorter than period, or constant time series! Unable to calculate acf/pacf features!"
logging.error(msg)
return acfpacf_features
nlag = min(acfpacf_lag, len(x) - 2)
diff1x = [x[i] - x[i - 1] for i in range(1, len(x))]
diff2x = [diff1x[i] - diff1x[i - 1] for i in range(1, len(diff1x))]
y_acf_list = acf(x, unbiased=False, fft=True, nlags=period)[1:]
diff1y_acf_list = acf(diff1x, unbiased=False, fft=True, nlags=nlag)[1:]
diff2y_acf_list = acf(diff2x, unbiased=False, fft=True, nlags=nlag)[1:]
y_pacf_list = pacf(x, nlags=period)[1:]
diff1y_pacf_list = pacf(diff1x, nlags=nlag)[1:]
diff2y_pacf_list = pacf(diff2x, nlags=nlag)[1:]
# getting ACF features
(
acfpacf_features["y_acf1"],
acfpacf_features["y_acf5"],
acfpacf_features["diff1y_acf1"],
acfpacf_features["diff1y_acf5"],
acfpacf_features["diff2y_acf1"],
acfpacf_features["diff2y_acf5"],
acfpacf_features["seas_acf1"],
) = TsFeatures.get_acf_features(
extra_args,
default_status,
y_acf_list,
diff1y_acf_list,
diff2y_acf_list,
)
# getting PACF features
(
acfpacf_features["y_pacf5"],
acfpacf_features["diff1y_pacf5"],
acfpacf_features["diff2y_pacf5"],
acfpacf_features["seas_pacf1"],
) = TsFeatures.get_pacf_features(
extra_args,
default_status,
y_pacf_list,
diff1y_pacf_list,
diff2y_pacf_list,
)
return acfpacf_features
# standard deviation of the first derivative
[docs] @staticmethod
@jit(forceobj=True)
def get_std1st_der(x: np.ndarray):
"""
Calculating std1st_der: the standard deviation of the first derivative of the time series.
Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
The standard deviation of the first derivative of the time series.
"""
std1st_der = np.std(np.gradient(x))
return std1st_der
# crossing points
[docs] @staticmethod
@jit(nopython=True)
def get_crossing_points(x: np.ndarray):
"""
Calculating crossing points: the number of times a time series crosses the median line.
Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
The number of times a time series crosses the median line.
"""
median = np.median(x)
cp = 0
for i in range(len(x) - 1):
if x[i] <= median < x[i + 1] or x[i] >= median > x[i + 1]:
cp += 1
return cp
# binarize mean
[docs] @staticmethod
@jit(nopython=True)
def get_binarize_mean(x: np.ndarray):
"""
Converts time series array into a binarized version.
Time-series values above its mean are given 1, and those below the mean are 0.
Return the average value of the binarized vector.
Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
The binarized version of time series array.
"""
return np.mean(np.asarray(x) > np.mean(x))
# KPSS unit root test
[docs] @staticmethod
@jit(forceobj=True)
def get_unitroot_kpss(x: np.ndarray):
"""
Getting a test statistic based on KPSS test.
Test a null hypothesis that an observable time series is stationary around a deterministic trend.
A vector comprising the statistic for the KPSS unit root test with linear trend and lag one
Wiki: https://en.wikipedia.org/wiki/KPSS_test
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
Test statistics acquired using KPSS test.
"""
return kpss(x, regression="ct", nlags=1)[0]
# heterogeneity
[docs] @staticmethod
@jit(forceobj=True)
def get_het_arch(x: np.ndarray):
"""
reference: https://www.statsmodels.org/dev/generated/statsmodels.stats.diagnostic.het_arch.html
Engle’s Test for Autoregressive Conditional Heteroscedasticity (ARCH)
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
Lagrange multiplier test statistic
"""
return het_arch(x, nlags=min(10, len(x) // 5))[0]
# histogram mode
[docs] @staticmethod
@jit(nopython=True)
def get_histogram_mode(x: np.ndarray, nbins: int = 10):
"""
Measures the mode of the data vector using histograms with a given number of bins.
Reference: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
Args:
x: The univariate time series array in the form of 1d numpy array.
nbins: int; Number of bins to get the histograms. Default value is 10.
Returns:
Mode of the data vector using histograms.
"""
cnt, val = np.histogram(x, bins=nbins)
return val[cnt.argmax()]
# First min/zero AC (2)
[docs] @staticmethod
@jit(forceobj=True)
def get_special_ac(
x: np.ndarray, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
):
"""
Gettting special_ac features.
firstmin_ac: the time of first minimum in the autocorrelation function
firstzero_ac: the time of first zero crossing the autocorrelation function.
Args:
x: The univariate time series array in the form of 1d numpy array.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Special autocorrelation features described above.
"""
# First min AC
special_ac_features = {"firstmin_ac": np.nan, "firstzero_ac": np.nan}
AC = acf(x, unbiased=False, fft=True, nlags=len(x))[1:]
if extra_args is not None and extra_args.get("firstmin_ac", default_status):
i = 0
while i < len(AC) - 1:
if AC[i] > AC[i + 1]:
i += 1
else:
break
special_ac_features["firstmin_ac"] = i + 1
# First zero AC
if extra_args is not None and extra_args.get("firstzero_ac", default_status):
j = 0
while j < len(AC) - 1:
if AC[j] > 0 > AC[j + 1]:
break
else:
j += 1
special_ac_features["firstzero_ac"] = j + 2
return special_ac_features
# Linearity
[docs] @staticmethod
@jit(forceobj=True)
def get_linearity(x: np.ndarray):
"""
Getting linearity feature: R square from a fitted linear regression.
Args:
x: The univariate time series array in the form of 1d numpy array.
Returns:
R square from a fitted linear regression.
"""
_, _, r_value, _, _ = stats.linregress(np.arange(len(x)), x)
return r_value ** 2
# Holt Parameters (2)
[docs] @staticmethod
def get_holt_params(
x: np.ndarray, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
):
"""
Estimates the smoothing parameter for the level-alpha and the smoothing parameter
for the trend-beta of Holt’s linear trend method.
'alpha': Level parameter of the Holt model.
'beta': Trend parameter of the Hold model.
Args:
x: The univariate time series array in the form of 1d numpy array.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Level and trend parameter of a fitted Holt model.
"""
holt_params_features = {"holt_alpha": np.nan, "holt_beta": np.nan}
try:
m = ExponentialSmoothing(x, trend="add", seasonal=None).fit()
if extra_args is not None and extra_args.get("holt_alpha", default_status):
holt_params_features["holt_alpha"] = m.params["smoothing_level"]
if extra_args is not None and extra_args.get("holt_beta", default_status):
statsmodels_ver = float(re.findall('([0-9]+\\.[0-9]+)\\..*', statsmodels.__version__)[0])
if statsmodels_ver < 0.12:
holt_params_features["holt_beta"] = m.params["smoothing_slope"]
elif statsmodels_ver >= 0.12:
holt_params_features["holt_beta"] = m.params["smoothing_trend"]
return holt_params_features
except Exception:
return holt_params_features
# Holt Winter’s Parameters (3)
[docs] @staticmethod
def get_hw_params(
x: np.ndarray,
period: int = 7,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Estimates the smoothing parameter for the level-alpha, trend-beta of HW’s linear trend,
and additive seasonal trend-gamma.
Args:
x: The univariate time series array in the form of 1d numpy array.
period: int; Seaonal period for fitting exponential smoothing model. Default
value is 7.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Level, trend and seasonal parameter of a fitted Holt-Winter's model.
"""
hw_params_features = {"hw_alpha": np.nan, "hw_beta": np.nan, "hw_gamma": np.nan}
try:
# addressing issue of use_boxcox arg in different versions of statsmodels
statsmodels_ver = float(re.findall('([0-9]+\\.[0-9]+)\\..*', statsmodels.__version__)[0])
_args_ = {
"seasonal_periods": period,
"trend": "add",
"seasonal": "add",
}
# performing version check on statsmodels
if statsmodels_ver >= 0.12:
_args_["use_boxcox"] = True
_args_["initialization_method"] = 'estimated'
m = ExponentialSmoothing(x, **_args_).fit()
elif statsmodels_ver < 0.12:
m = ExponentialSmoothing(x, **_args_).fit(use_boxcox = True)
if extra_args is not None and extra_args.get("hw_alpha", default_status):
hw_params_features["hw_alpha"] = m.params["smoothing_level"]
if extra_args is not None and extra_args.get("hw_beta", default_status):
if statsmodels_ver < 0.12:
hw_params_features["hw_beta"] = m.params["smoothing_slope"]
elif statsmodels_ver >= 0.12:
hw_params_features["hw_beta"] = m.params["smoothing_trend"]
if extra_args is not None and extra_args.get("hw_gamma", default_status):
hw_params_features["hw_gamma"] = m.params["smoothing_seasonal"]
return hw_params_features
except Exception:
return hw_params_features
# CUSUM Detection Outputs (8)
[docs] @staticmethod
def get_cusum_detector(
ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
):
"""
Run the Kats CUSUM Detector on the Time Series, extract features from the outputs of the detection.
Args:
ts: The univariate time series array in the form of Kats TimeSeriesData object.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Outputs of the CUSUM Detector, which include (1) Number of changepoints, either 1
or 0, (2) Confidence of the changepoint detected, 0 if not changepoint, (3) index,
or position of the changepoint detected within the time series, (4) delta of the
mean levels before and after the changepoint, (5) log-likelihood ratio of changepoint,
(6) Boolean - whether regression is detected by CUSUM, (7) Boolean - whether
changepoint is stable, (8) p-value of changepoint.
"""
cusum_detector_features = {
"cusum_num": np.nan,
"cusum_conf": np.nan,
"cusum_cp_index": np.nan,
"cusum_delta": np.nan,
"cusum_llr": np.nan,
"cusum_regression_detected": np.nan,
"cusum_stable_changepoint": np.nan,
"cusum_p_value": np.nan,
}
try:
cusum = cusum_detection.CUSUMDetector(ts)
cusum_cp = cusum.detector()
if extra_args is not None and extra_args.get("cusum_num", default_status):
cusum_detector_features["cusum_num"] = len(cusum_cp)
if extra_args is not None and extra_args.get("cusum_conf", default_status):
cusum_detector_features["cusum_conf"] = (
0 if len(cusum_cp) == 0 else cusum_cp[0][0].confidence
)
if extra_args is not None and extra_args.get("cusum_cp_index", default_status):
cusum_detector_features["cusum_cp_index"] = (
0 if len(cusum_cp) == 0 else cusum_cp[0][1]._cp_index / len(ts)
)
if extra_args is not None and extra_args.get("cusum_delta", default_status):
cusum_detector_features["cusum_delta"] = (
0 if len(cusum_cp) == 0 else cusum_cp[0][1]._delta
)
if extra_args is not None and extra_args.get("cusum_llr", default_status):
cusum_detector_features["cusum_llr"] = (
0 if len(cusum_cp) == 0 else cusum_cp[0][1]._llr
)
if extra_args is not None and extra_args.get("cusum_regression_detected", default_status):
cusum_detector_features["cusum_regression_detected"] = (
False if len(cusum_cp) == 0 else cusum_cp[0][1]._regression_detected
)
if extra_args is not None and extra_args.get("cusum_stable_changepoint", default_status):
cusum_detector_features["cusum_stable_changepoint"] = (
False if len(cusum_cp) == 0 else cusum_cp[0][1]._stable_changepoint
)
if extra_args is not None and extra_args.get("cusum_p_value", default_status):
cusum_detector_features["cusum_p_value"] = (
0 if len(cusum_cp) == 0 else cusum_cp[0][1]._p_value
)
return cusum_detector_features
except Exception:
return cusum_detector_features
# Robust Stat Detection Outputs (2)
[docs] @staticmethod
def get_robust_stat_detector(
ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
):
"""
Run the Kats Robust Stat Detector on the Time Series, extract features from the outputs of the detection.
Args:
ts: The univariate time series array in the form of Kats TimeSeriesData object.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
(1) Number changepoints detected by the Robust Stat Detector, and (2) Mean of
the Metric values from the Robust Stat Detector.
"""
robust_stat_detector_features = {
"robust_num": np.nan,
"robust_metric_mean": np.nan,
}
try:
robust = robust_stat_detection.RobustStatDetector(ts)
robust_cp = robust.detector()
if extra_args is not None and extra_args.get("robust_num", default_status):
robust_stat_detector_features["robust_num"] = len(robust_cp)
if extra_args is not None and extra_args.get("robust_metric_mean", default_status):
robust_stat_detector_features["robust_metric_mean"] = (
0
if len(robust_cp) == 0
else np.sum([cp[1]._metric for cp in robust_cp]) / len(robust_cp)
)
return robust_stat_detector_features
except Exception:
return robust_stat_detector_features
# BOCP Detection Outputs (3)
[docs] @staticmethod
def get_bocp_detector(
ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
):
"""
Run the Kats BOCP Detector on the Time Series, extract features from the outputs of the detection.
Args:
ts: The univariate time series array in the form of Kats TimeSeriesData object.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
(tuple): tuple containing:
Number of changepoints detected by BOCP Detector
Max value of the confidence of the changepoints detected
Mean value of the confidence of the changepoints detected.
"""
bocp_detector_features = {
"bocp_num": np.nan,
"bocp_conf_max": np.nan,
"bocp_conf_mean": np.nan,
}
try:
bocp = bocpd.BOCPDetector(ts)
bocp_cp = bocp.detector(choose_priors=False)
if extra_args is not None and extra_args.get("bocp_num", default_status):
bocp_detector_features["bocp_num"] = len(bocp_cp)
if extra_args is not None and extra_args.get("bocp_conf_max", default_status):
bocp_detector_features["bocp_conf_max"] = (
0
if len(bocp_cp) == 0
else np.max([cp[0].confidence for cp in bocp_cp])
)
if extra_args is not None and extra_args.get("bocp_conf_mean", default_status):
bocp_detector_features["bocp_conf_mean"] = (
0
if len(bocp_cp) == 0
else np.sum([cp[0].confidence for cp in bocp_cp]) / len(bocp_cp)
)
return bocp_detector_features
except Exception:
return bocp_detector_features
# Outlier Detection Outputs (1)
[docs] @staticmethod
def get_outlier_detector(
ts: TimeSeriesData,
decomp: str = "additive",
iqr_mult: float = 3.0,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Run the Kats Outlier Detector on the Time Series, extract features from the outputs of the detection.
Args:
ts: The univariate time series array in the form of Kats TimeSeriesData object.
decomp: str; Additive or Multiplicative mode for performing outlier detection using
OutlierDetector. Default value is 'additive'.
iqr_mult: float; IQR range for determining outliers through
OutlierDetector. Default value is 3.0.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Number of outliers by the Outlier Detector.
"""
outlier_detector_features = {"outlier_num": np.nan}
try:
odetector = outlier.OutlierDetector(ts, decomp=decomp, iqr_mult=iqr_mult)
odetector.detector()
if extra_args is not None and extra_args.get("outlier_num", default_status):
# pyre-fixme[16]: `OutlierDetector` has no attribute `outliers`.
outlier_detector_features["outlier_num"] = len(odetector.outliers[0])
return outlier_detector_features
except Exception:
return outlier_detector_features
# Trend Detection Outputs (3)
[docs] @staticmethod
def get_trend_detector(
ts: TimeSeriesData,
threshold: float = 0.8,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Run the Kats Trend Detector on the Time Series, extract features from the outputs of the detection.
Args:
ts: The univariate time series array in the form of Kats TimeSeriesData object.
threshold: float; threshold for trend intensity; higher threshold gives trend
with high intensity (0.8 by default). If we only want to use the p-value
to determine changepoints, set threshold = 0.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
(1) Number of trends detected by the Kats Trend Detector, (2) Number of increasing
trends, (3) Mean of the abolute values of Taus of the trends detected.
"""
trend_detector_features = {
"trend_num": np.nan,
"trend_num_increasing": np.nan,
"trend_avg_abs_tau": np.nan,
}
try:
tdetector = trend_mk.MKDetector(data=ts, threshold=threshold)
tdetected_time_points = tdetector.detector(direction="both")
if extra_args is not None and extra_args.get("trend_num", default_status):
trend_detector_features["trend_num"] = len(tdetected_time_points)
if extra_args is not None and extra_args.get("trend_num_increasing", default_status):
trend_detector_features["trend_num_increasing"] = len(
[
p
for p in tdetected_time_points
if p[1].trend_direction == "decreasing"
]
)
if extra_args is not None and extra_args.get("trend_avg_abs_tau", default_status):
trend_detector_features["trend_avg_abs_tau"] = (
0
if len(tdetected_time_points) == 0
else np.sum([abs(p[1].Tau) for p in tdetected_time_points])
/ len(tdetected_time_points)
)
return trend_detector_features
except Exception:
return trend_detector_features
@staticmethod
@jit(nopython=True)
def _ewma(
arr: np.ndarray,
span: int,
min_periods: int
):
"""
Exponentialy weighted moving average specified by a decay ``window``
to provide better adjustments for small windows via:
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
(1 + (1-a) + (1-a)^2 + ... + (1-a)^n).
Args:
arr : np.ndarray; A single dimenisional numpy array.
span : int; The decay window, or 'span'.
min_periods: int; Minimum amount of data points we'd like to include in the output.
Returns:
A np.ndarray. The exponentially weighted moving average of the array.
"""
output_array = np.empty(arr.shape[0], dtype=np.float64)
output_array[:] = np.NaN
arr = arr[~np.isnan(arr)]
n = arr.shape[0]
ewma = np.empty(n, dtype=np.float64)
alpha = 2 / float(span + 1)
w = 1
ewma_old = arr[0]
ewma[0] = ewma_old
for i in range(1, n):
w += (1-alpha)**i
ewma_old = ewma_old*(1-alpha) + arr[i]
ewma[i] = ewma_old / w
output_subset = ewma[(min_periods-1):]
output_array[-len(output_subset):] = output_subset
return output_array
@staticmethod
@jit(forceobj=True)
def _get_nowcasting_np(
x: np.ndarray,
window: int = 5,
n_fast: int = 12,
n_slow: int = 21,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Internal function for actually performing feature engineering using the same procedure as
nowcasting feature_extraction under kats/models.
Args:
x: The univariate time series array in the form of 1d numpy array.
window: int; Length of window size for all Nowcasting features. Default value is 5.
n_fast: int; length of "fast" or short period exponential moving average in the MACD
algorithm in the nowcasting features. Default value is 12.
n_slow: int; length of "slow" or long period exponential moving average in the MACD
algorithm in the nowcasting features. Default value is 21.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
A list containing extracted nowcast features.
"""
# initializing the outputs
nowcasting_features = [np.nan for _ in range(7)]
# ROC: indicating return comparing to step n back.
if extra_args is not None and extra_args.get("nowcast_roc", default_status):
M = x[(window-1):] - x[:-(window-1)]
N = x[:-(window-1)]
arr = M / N
nowcasting_features[0] = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0).mean()
# MOM: indicating momentum: difference of current value and n steps back.
if extra_args is not None and extra_args.get("nowcast_mom", default_status):
M = x[window:] - x[:-window]
nowcasting_features[1] = np.nan_to_num(M, nan=0.0, posinf = 0.0, neginf=0.0).mean()
# MA: indicating moving average in the past n steps.
if extra_args is not None and extra_args.get("nowcast_ma", default_status):
ret = np.cumsum(x, dtype=float)
ret[window:] = ret[window:] - ret[:-window]
ma = ret[window - 1:] / window
nowcasting_features[2] = np.nan_to_num(ma, nan=0.0, posinf=0.0, neginf=0.0).mean()
# LAG: indicating lagged value at the past n steps.
if extra_args is not None and extra_args.get("nowcast_lag", default_status):
N = x[:-window]
nowcasting_features[3] = np.nan_to_num(N, nan=0.0, posinf=0.0, neginf=0.0).mean()
# MACD: https://www.investopedia.com/terms/m/macd.asp.
ema_fast = TsFeatures._ewma(x, n_fast, n_slow-1)
ema_slow = TsFeatures._ewma(x, n_slow, n_slow-1)
MACD = ema_fast - ema_slow
if extra_args is not None and extra_args.get("nowcast_macd", default_status):
nowcasting_features[4] = np.nan_to_num(np.nanmean(MACD), nan=0.0, posinf=0.0, neginf=0.0)
MACDsign = TsFeatures._ewma(MACD, 9, 8)
if extra_args is not None and extra_args.get("nowcast_macdsign", default_status):
nowcasting_features[5] = np.nan_to_num(np.nanmean(MACDsign), nan=0.0, posinf=0.0, neginf=0.0)
MACDdiff = MACD - MACDsign
if extra_args is not None and extra_args.get("nowcast_macddiff", default_status):
nowcasting_features[6] = np.nan_to_num(np.nanmean(MACDdiff), nan=0.0, posinf=0.0, neginf=0.0)
return nowcasting_features
# Nowcasting features (7)
[docs] @staticmethod
def get_nowcasting(
x: np.ndarray,
window: int = 5,
n_fast: int = 12,
n_slow: int = 21,
extra_args: Optional[Dict[str, bool]] = None,
default_status: bool = True,
):
"""
Run the Kats Nowcasting transformer on the Time Series, extract aggregated features from the outputs of the transformation.
Args:
x: The univariate time series array in the form of 1d numpy array.
window: int; Length of window size for all Nowcasting features. Default value is 5.
n_fast: int; length of "fast" or short period exponential moving average in the MACD
algorithm in the nowcasting features. Default value is 12.
n_slow: int; length of "slow" or long period exponential moving average in the MACD
algorithm in the nowcasting features. Default value is 21.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Mean values of the Kats Nowcasting algorithm time series outputs using the parameters
(window, n_fast, n_slow) indicated above. These outputs inclue : (1) Mean of Rate of
Change (ROC) time series, (2) Mean of Moving Average (MA) time series,(3) Mean of
Momentum (MOM) time series, (4) Mean of LAG time series, (5) Means of MACD, MACDsign,
and MACDdiff from Kats Nowcasting.
"""
nowcasting_features = {}
features = [
"nowcast_roc",
"nowcast_mom",
"nowcast_ma",
"nowcast_lag",
"nowcast_macd",
"nowcast_macdsign",
"nowcast_macddiff",
]
for feature in features:
if extra_args is not None and extra_args.get(feature, default_status):
nowcasting_features[feature] = np.nan
try:
_features = TsFeatures._get_nowcasting_np(x, window, n_fast, n_slow, extra_args, default_status)
for idx, feature in enumerate(features):
if extra_args is not None and extra_args.get(feature, default_status):
nowcasting_features[feature] = _features[idx]
return nowcasting_features
except Exception:
return nowcasting_features
# seasonality features (4)
[docs] @staticmethod
def get_seasonalities(
ts: TimeSeriesData, extra_args: Optional[Dict[str, bool]] = None, default_status: bool = True
):
"""
Run the Kats seaonality detectors to get the estimated seasonal period, then extract trend,
seasonality and residual magnitudes.
Args:
ts: The univariate time series array in the form of Kats TimeSeriesData object.
extra_args: A dictionary containing information for disabling calculation
of a certain feature. Default value is None, i.e. no feature is disabled.
default_status: Default status of the switch for calculate the features or not.
Default value is True.
Returns:
Returns the detected seasonality period.
Slope acquired via fitting simple linear regression model on the trend component
as trend magnitude.
Difference between the 95 percentile and 5 percentile of the seasonal component
as the seasonality magnitude.
Standard deviation of the residual component.
"""
seasonality_features = {
"seasonal_period": np.nan,
"trend_mag": np.nan,
"seasonality_mag": np.nan,
"residual_std": np.nan,
}
try:
# detrending for period estimation
detrended = TimeSeriesData(
pd.DataFrame(
{
"time": len(ts.value.values) - 1,
"value": ts.value.values[1:] - ts.value.values[:-1],
}
)
)
detected = seasonality.FFTDetector(detrended).detector()
if detected["seasonality_presence"]:
_period = int(np.min(detected["seasonalities"]))
elif not detected["seasonality_presence"]:
_period = 7
res = STL(ts.value.values, period=_period).fit()
if extra_args is not None and extra_args.get("seasonal_period", default_status):
seasonality_features["seasonal_period"] = _period
# getting seasonality magnitude
if extra_args is not None and extra_args.get("seasonality_mag", default_status):
seasonality_features["seasonality_mag"] = np.round(
np.quantile(res.seasonal, 0.95) - np.quantile(res.seasonal, 0.05)
)
# fitting linear regression for trend magnitude
if extra_args is not None and extra_args.get("trend_mag", default_status):
exog = res.trend
_series = exog - exog[0]
mod = sm.OLS(_series, np.arange(len(_series)))
_res = mod.fit()
# trend magnitude
seasonality_features["trend_mag"] = _res.params[0]
# residual standard deviation
if extra_args is not None and extra_args.get("residual_std", default_status):
seasonality_features["residual_std"] = np.std(res.resid)
return seasonality_features
except Exception:
return seasonality_features