#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""The LSTM model stands for Long short-term memory, it is a recurrent neural network model that can be used for sequential data.
More information for the model can be found: https://en.wikipedia.org/wiki/Long_short-term_memory
We directly adopt the PyTorch implementation and apply the model for time series forecast. More details for the PyTorch modules are here: https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
from typing import Tuple, List, Dict, Any
import numpy as np
import pandas as pd
import torch.nn as nn
import torch
from sklearn.preprocessing import MinMaxScaler
import kats.models.model as mm
from kats.consts import Params, TimeSeriesData
[docs]class LSTMParams(Params):
"""Parameter class for time series LSTM model
This is the parameter class for time series LSTM model, it currently contains three parameters
Attributes:
hidden_size: LSTM hidden unit size
time_window: Time series sequence length that feeds into the model
num_epochs: Number of epochs for the training process
"""
__slots__ = ["hidden_size", "time_window", "num_epochs"]
def __init__(
self,
hidden_size : int,
time_window : int,
num_epochs: int
) -> None:
super().__init__()
self.hidden_size = hidden_size
self.time_window = time_window
self.num_epochs = num_epochs
logging.debug(
"Initialized LSTMParams instance."
f"hidden_size:{hidden_size}, time_window:{time_window}, num_epochs:{num_epochs}"
)
def validate_params(self):
logging.info("Method validate_params() is not implemented.")
pass
[docs]class LSTMForecast(nn.Module):
"""Torch forecast class for time series LSTM model
This is the forecast class for time series LSTM model inherited from the PyTorch module, detailed implementation for the core LSTM and Linear modules can be gound here:
https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html
https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
Attributes:
params: A LSTMParams instance for parameters
input_size: Input unit feature size for the LSTM layer
output_size: Output unit feature size from the output Linear layer
"""
def __init__(self,
params: LSTMParams,
input_size: int,
output_size: int
) -> None:
super().__init__()
self.lstm = nn.LSTM(input_size=input_size, hidden_size=params.hidden_size)
self.linear = nn.Linear(in_features=params.hidden_size, out_features=output_size)
[docs] def forward(self, input_seq: torch.Tensor) -> torch.Tensor:
"""The forward method for the LSTM forecast PyTorch module
Args:
input_seq: A torch tensor contains the input data sequence for the LSTM layer
Returns:
prediction: A torch tensor contains the output prediction from the output Linear layer
"""
# pyre-fixme[16]: `LSTMForecast` has no attribute `hidden_cell`.
lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
predictions = self.linear(lstm_out.view(len(input_seq), -1))
return predictions[-1]
[docs]class LSTMModel(mm.Model):
"""Kats model class for time series LSTM model
This is the Kats model class for time series forecast using the LSTM model
Attributes:
data: :class:`kats.consts.TimeSeriesData`, the input data
params: A LSTMParams object for the parameters
"""
def __init__(
self,
data: TimeSeriesData,
params: LSTMParams
) -> None:
super().__init__(data, params)
if not isinstance(self.data.value, pd.Series):
msg = f"Only support univariate time series, but get {type(self.data.value)}."
logging.error(msg)
raise ValueError(msg)
def __setup_data(self) -> List[Tuple[torch.Tensor, torch.Tensor]]:
"""Prepare input data for the LSTM model
This method will perform a min-max normalization on the input data, then output normalized input sequence data and true values for the prediction. More details for the normalizer:
https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html
Args:
None
Returns:
A list of tuples that include both the input sequence tensor (with normalized values) and ground truth value for prediction
"""
train_data = self.data.value.values.astype(float)
#scaling using MinMaxScaler
# pyre-fixme[16]: `LSTMModel` has no attribute `scaler`.
# pyre-fixme[16]: Module `sklearn` has no attribute `preprocessing`.
self.scaler = MinMaxScaler(feature_range=(-1, 1))
train_data_scaled = self.scaler.fit_transform(train_data.reshape(-1, 1))
#converting to Tensor
# pyre-fixme[16]: `LSTMModel` has no attribute `train_data_normalized`.
self.train_data_normalized = torch.FloatTensor(train_data_scaled).view(-1)
#generating sequence
inout_seq = []
for i in range(len(self.train_data_normalized) - self.params.time_window):
train_seq = self.train_data_normalized[i:i + self.params.time_window]
train_label = self.train_data_normalized[i + self.params.time_window : i + self.params.time_window + 1]
inout_seq.append((train_seq, train_label))
return inout_seq
[docs] def fit(self, **kwargs) -> None:
"""Fit the LSTM forecast model
Args:
None
Returns:
The fitted LSTM model object
"""
logging.debug("Call fit() with parameters."
f"kwargs:{kwargs}")
# learning rate
# pyre-fixme[16]: `LSTMModel` has no attribute `lr`.
self.lr = kwargs.get("lr", 0.001)
# supports univariate time series, multivariate support in the future
# pyre-fixme[16]: `LSTMModel` has no attribute `model`.
# pyre-fixme[16]: `LSTMModel` has no attribute `params`.
self.model = LSTMForecast(params=self.params, input_size=1, output_size=1)
# loss function
loss_function = nn.MSELoss()
# optimizer
optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
#inout_seq
train_inout_seq = self.__setup_data()
for i in range(self.params.num_epochs):
for seq, labels in train_inout_seq:
optimizer.zero_grad()
self.model.hidden_cell = (torch.zeros(1, 1, self.params.hidden_size),
torch.zeros(1, 1, self.params.hidden_size))
# prediction using input data
y_pred = self.model(seq)
# calculating loss
single_loss = loss_function(y_pred, labels)
single_loss.backward()
optimizer.step()
if i % 25 == 1:
logging.info(f'epoch: {i:3} loss: {single_loss.item():10.8f}')
# pyre-fixme[7]: Expected `None` but got `LSTMModel`.
return self
# pyre-fixme[14]: `predict` overrides method defined in `Model` inconsistently.
[docs] def predict(self, steps: int, **kwargs) -> pd.DataFrame:
"""Prediction function for a multi-step forecast
Args:
steps: number of steps for the forecast
Returns:
A pd.DataFrame that includes the forecast and confidence interval
"""
logging.debug(
"Call predict() with parameters. "
f"steps:{steps}, kwargs:{kwargs}"
)
# pyre-fixme[16]: `LSTMModel` has no attribute `freq`.
# pyre-fixme[16]: `LSTMModel` has no attribute `data`.
self.freq = kwargs.get("freq", pd.infer_freq(self.data.time))
# pyre-fixme[16]: `LSTMModel` has no attribute `model`.
self.model.eval()
# get last train input sequence
# pyre-fixme[16]: `LSTMModel` has no attribute `train_data_normalized`.
# pyre-fixme[16]: `LSTMModel` has no attribute `params`.
test_inputs = self.train_data_normalized[-self.params.time_window:].tolist()
for _ in range(steps):
seq = torch.FloatTensor(test_inputs[-self.params.time_window:])
with torch.no_grad():
self.model.hidden = (torch.zeros(1, 1, self.params.hidden_size),
torch.zeros(1, 1, self.params.hidden_size))
test_inputs.append(self.model(seq).item())
# inverse transform
# pyre-fixme[16]: `LSTMModel` has no attribute `scaler`.
fcst_denormalized = self.scaler.inverse_transform(np.array(test_inputs[self.params.time_window:]).reshape(-1, 1)).flatten()
logging.info("Generated forecast data from LSTM model.")
logging.debug(f"Forecast data: {fcst_denormalized}")
last_date = self.data.time.max()
dates = pd.date_range(start=last_date, periods=steps + 1, freq=self.freq)
# pyre-fixme[16]: `LSTMModel` has no attribute `dates`.
self.dates = dates[dates != last_date] # Return correct number of periods
# pyre-fixme[16]: `LSTMModel` has no attribute `y_fcst`.
self.y_fcst = fcst_denormalized
# pyre-fixme[16]: `LSTMModel` has no attribute `y_fcst_lower`.
self.y_fcst_lower = fcst_denormalized * 0.95
# pyre-fixme[16]: `LSTMModel` has no attribute `y_fcst_upper`.
self.y_fcst_upper = fcst_denormalized * 1.05
# pyre-fixme[16]: `LSTMModel` has no attribute `fcst_df`.
self.fcst_df = pd.DataFrame(
{
"time": self.dates,
"fcst": self.y_fcst ,
"fcst_lower": self.y_fcst_lower,
"fcst_upper": self.y_fcst_upper,
}
)
logging.debug(f"Return forecast data: {self.fcst_df}")
return self.fcst_df
[docs] def plot(self):
"""Plot forecast results from the LSTM model
"""
mm.Model.plot(self.data, self.fcst_df)
def __str__(self):
return "LSTM"
[docs] @staticmethod
def get_parameter_search_space() -> List[Dict[str, Any]]:
"""Get default parameter search space for the LSTM model
Args:
None
Returns:
A dictionary with the default LSTM parameter search space.
"""
return [
{
"name": "hidden_size",
"type": "choice",
"values": list(range(1, 500, 10)),
"value_type": "int",
"is_ordered": True,
},
{
"name": "time_window",
"type": "choice",
"values": list(range(1, 20, 1)),
"value_type": "int",
"is_ordered": True,
},
{
"name": "num_epochs",
"type": "choice",
"values": list(range(50, 2000, 50)),
"value_type": "int",
"is_ordered": True,
},
]