Source code for kats.utils.cupik

#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import logging
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from kats.consts import TimeSeriesData


[docs]class Pipeline: """ CuPiK (Customized Pipeline for Kats) is created with a similar mindset of sklearn pipeline. Users can call multiple methods within Kats library and run them in sequential to perform a series of useful timeseries processes at once. Due to the current distribution of Kats library, we provide the function to apply detectors, transformers and time series modeling sequentially using CuPiK. We also offer api to sklearn, once feature extraction using TsFeatures is performed, users can feed the results directly to an sklearn machine learning model. """ remove: bool = False useFeatures: bool = False extra_fitting_params: Optional[Dict[str, Any]] = None y: Optional[Union[np.ndarray, pd.Series]] = None def __init__(self, steps: List[Tuple[str, Any]]): """ inputs: steps: a list of the initialized Kats methods/sklearn machine learning model, with the format of [('user_defined_method_name', initialized method class)]. User can use any name for 'user_defined_method_name' for identification purpose initialized attributes: steps: same as the "steps" in the inputs metadata: an dictionary to store outputs that are not passing to the next step, like results from a detector. These metadata stored here with the format of "user_defined_method_name": output univariate: is the data fed in a list of multiple univariate time series or just a single univariate time series functions: a look up dictionary linking each method in the steps to what processing function inside CuPiK should we apply """ self.steps = steps self.metadata = {} self.univariate = False self.functions = { "detector": self.__detect__, "transformer": self.__transform__, "model": self.__model__, } def __detect__( self, steps: List[Any], data: List[TimeSeriesData], extra_params: Dict[str, Any] ) -> Tuple[List[TimeSeriesData], Any]: """ Internal function for processing the detector steps inputs: steps: a list of the duplicated initialized detector. We will be using each duplicated detector to process one time series data within the data list data: a list containing time series data to be processed extra_params: a dictionary holding extra customized parameters to be fed in the detector outputs: data: a list of post-processed data for next steps metadata: outputs from the detectors, like changepoints, outliers, etc. """ metadata = [] for i, (s, d) in enumerate(zip(steps, data)): s.data = d if not s.data.is_univariate(): msg = "Only support univariate time series, but get {type}.".format( type=type(s.data.value) ) logging.error(msg) raise ValueError(msg) s.data.time = pd.to_datetime(s.data.time) if s.__subtype__ == "outlier": extra_params["pipe"] = True metadata.append(s.detector(**extra_params)) if ( self.remove and s.__subtype__ == "outlier" ): # outlier removal when the step is outlier detector, # and user required us to remove outlier data[i] = s.remover(interpolate=True) return data, metadata def __transform__( self, steps: List[Any], data: List[TimeSeriesData], extra_params: Dict[str, Any] ) -> Tuple[List[Any], List[Any]]: """ Internal function for processing the transformation/transformer steps. We currently only have tsfeatures as a transformation/transformer step in Kats. inputs: steps: a list of the duplicated initialized transformer. We will be using each duplicated transformer to process one time series data within the data list data: a list containing time series data to be processed extra_params: a dictionary holding extra customized parameters to be fed in the transformer outputs: data: a list of post-processed data for next steps. We user requires to use the outputs of the transformer, this would become the output from the transformer turning time series data to tabular data; otherwise, do nothing at the current stage of Kats. metadata: outputs from the transformer """ metadata = [] for s, d in zip(steps, data): metadata.append(s.transform(d)) if self.useFeatures: return metadata, metadata else: return data, metadata def __model__( self, steps: List[Any], data: List[TimeSeriesData], extra_params: Dict[str, Any] ): """ Internal function for processing the modeling step inputs: steps: a list of the duplicated initialized time series model in Kats. We will be using each duplicated model to process one time series data within the data list data: a list containing time series data to be processed extra_params: a dictionary holding extra customized parameters to be fed in the model outputs: data: a list of fitted time series model None as the placeholder of metadata """ for i, (s, d) in enumerate(zip(steps, data)): s.data = d if not isinstance(d.value, pd.Series): msg = "Only support univariate time series, but get {type}.".format( type=type(d.value) ) logging.error(msg) raise ValueError(msg) s.fit(**extra_params) data[i] = s return data, None def _fit_sklearn_( self, step: Any, data: List[Dict[str, Any]], y: Any, ) -> Any: """ Internal function for fitting sklearn model on a tabular data with features extracted. inputs: step: an sklearn model class data: a list with each item corresponds to an output from the feature extraction methods in Kats y: label data for fitting sklearn model outputs: step: a fitted sklearn model """ assert (type(data) == list) and ( type(data[0]) == dict ), "Require data preprocessed by TsFeatures, please set useFeatures = True" assert y is not None, "Missing dependent variable" df = pd.DataFrame(data).dropna(axis=1) X_train, y_train = df.values, self.y step.fit(X_train, y_train) return step def __fit__( self, n: str, s: Any, data: Any, ) -> List[ Any ]: # using list output for adaption of current multi-time series scenarios """ Internal function for performing the detailed fitting functions inputs: n: short for name, "user_defined_method_name" s: short for step, a Kats method or sklearn model data: either a list of univariate time series data or a list of dictionaries including the output acquired using feature extraction methods in Kats outputs: data: either a list of post processed univariate time series data or a list of dictionaries including the output acquired using feature extraction methods in Kats """ if ( str(s.__class__).split()[1][1:8] == "sklearn" ): # if current step is a scikit-learn model return self._fit_sklearn_(s, data, self.y) _steps_ = [s for _ in range(len(data))] Type = s.__type__ extra_params = (self.extra_fitting_params or {}).get(n, {}) data, metadata = self.functions[Type](_steps_, data, extra_params) if metadata is not None: self.metadata[n] = metadata # saving the metadata of the current step into # the dictionary of {"user_defined_method_name": corresponding_metadata} return data
[docs] def fit(self, data: Any, params: Optional[Dict[str, Any]] = None, **kwargs) -> Any: """ This function is the external function for user to fit the pipeline inputs: data: a single univariate time series data or a list of multiple univariate time series data params: a dictionary with the extra parameters for each step. The dictionary holds the format of {"user_defined_method_name": {"parameter": value}} extra key word arguments: remove: a boolean for telling the pipeline to remove outlier or not useFeatures: a boolean for telling the pipeline whether to use TsFeatures to process the data for sklearn models, or merely getting the features as metadata for other usage y: label data for fitting sklearn model, an array or a list outputs: data: output a single result for univariate data, or a list of results for multiple univariate time series data fed originally in the format of a list. Determined by the last step, the output could be processed time series data, or fitted kats/sklearn model, etc. """ # Initialize a place holder for params if params is None: params = {} # Judging if extra functions needed #### self.remove = kwargs.get("remove", False) # remove outliers or not self.useFeatures = kwargs.get( "useFeatures", False ) # do you want to use tsfeatures as transformation or analyzer self.y = kwargs.get("y", None) #### # Extra parameters for specific method of each step self.extra_fitting_params = params if type(data) != list: # Since we support multiple timeseries in a list, # when data is univariate, we put them in a list self.univariate = True data = [data] for ( n, s, ) in self.steps: # Iterate through each step and perform the internal fitting # function data = self.__fit__(n, s, data) if ( self.univariate ): # When input data is one univariate time series, we directly # present the output (not in a list) return data[0] else: return data