本文意在飞速使用LSTM,在数学建模中能更加快速。数据输入支持一维数据(单变量预测)或者为二维数据(多变量同时预测)。包含置信区间的计算。推荐使用 jupyter,因为可以保存训练步骤,重写画图代码更加便捷。
完整代码下载链接
数据输入 api
def data_basic():
"""2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
return date, data
def data01():
# 测试二维数组多变量
data1 = np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
data2 = np.sin(np.arange(300) * np.pi / 25) + np.random.randn(300) * 0.1 + 3
return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]
def data02():
# 测试一维数组
return np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
def data03():
# 测试二维数组单变量
return data02()[:, np.newaxis]
在代码中有4种测试样例,其中 函数输入 比较好理解。为了方便文件读取可以采用
Utils.openfile(file_location_name, data_col=[indexes of data], date_col=index-of-date)
其中 file_location_name 即为文件所在位置,数据需满足第一行是列名或手动更改代码。data_col 即为数据所在的列,可以为字符串数组或者index数组。date_col即为日期所在的列,可以为None即没有。
初始化网络
window_size = 10
lstm = CustomLSTM(data, window_size, scale=False)
# lstm = CustomLSTM(data, window_size, date, scale=True)
window_size 即为 seq2seq 划窗大小。默认以window_size大小的窗口预测下一个数据(并没有写一个窗口预测多个数据)。其中date可无,在画图中用1~n的数字代替。scale即是否使用MinMaxScaler,在训练时可以都做尝试。
lstm.init_lstm(hidden=64, num_layers=1)
即网络的 hidden dim 和 lstm 的num layer。
可以自己在 Model.py 中 LSTM类中增加或减少层 (不建议减少)
训练网络
# max_batch 表示是否以整一个数据作为 batch 不做分割
_ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
# lstm.train(num_epochs=1000, batch_size=55, max_batch=False)
该模板可以启用batch操作,但是在LSTM中感觉没有必要,直接使用max_batch就行,即用整一个batch进行训练;num_epochs即为循环次数;lr 为 learning rate ;is_copy 表示是否复制结果,这样中心训练的时候可以进行对比。该代码不考虑train test splite操作,可以先用大的lr进行粗训练再降低lr进行训练。
调整窗口大小重新训练
lstm.slice(20)
lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
result = lstm.fit(num_epochs=500, max_batch=True)
预测有真实值的数据
# 预测之前全部数据,是否启用 Dropout
predict = result.predicted(is_random=False)
Summary
# 打印 summary
r2, mse, rmse, mae, mape, smape = result.summary()
Predict
# 预测之后 n 步数据,is_random 表示是否启用 Dropou,如果随机的话启用 Dropout 。如果要使用 plot 功能,请保持 n 不变。 若要改变 n 则要从该行重新执行全部以下全部代码
n = 60
further_predict = result.predict(n, is_random=False)
指定 index 变量 n 步预测 1-alpha 置信区间
# alpha = 0.05: 0.95置信区间
alpha = 0.05
# 预测置信区间,函数具有 cache 保存功能
# n: 预测 n 步;n_sample 采样个数
conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
further_predict_sampled = result.sampled_result
# 之前(有真实值)的数据的置信区间。下列方法可二选一,推荐使用 conf_predicted_sampled
# 标准方法
conf_predicted = result.conf_int_predicted(alpha)
# 采样方法,函数具有 cache 保存功能
conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
predict_sampled = result.sampled_result_
# 需要首先求得置信区间才能 plot_confidence
# 输出的 conf 均为三维 ndarray 分别代表:变量index,每个变量预测index,(lower mean upper)
采样方法指:启用 Dropout 层运行 n_sample 次得到样本,计算置信区间,同时采样方法得到的平均值来表示预测结果更加符合符合实际。普通方法即运行一次得到的结果。
添加了 bootstrap 求 sample conf。但个人不建议使用。
画图
result.plot_loss()
result.plot()
result.plot_sampled()
# conf_predicted=None/conf_predicted_sampled/conf_predicted . 默认为 conf_predicted_sampled 的数据
# index 为显示第几个变量,一般单变量写 0 即可
result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted)
result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted_sampled)
# 该函数是上述所有函数的总结,上述为基本历程,该函数可以让读者自由匹配需要使用哪些数据
"""
names: Union[List[str], None] 表示每个变量的名字
index: Union[List[int], None, int] 表示绘制哪几个 index, None 即为全部
conf_predicted = conf_predicted / conf_predicted_sampled / None 已有数据置信区间
conf_predicte = conf / None 预测之后数据置信区间
conf_alpha: float eg: 0.05
result_ = predict / predict_sampled / None 对已有数据预测的数据
result = further_predict / further_predict_sampled / None 对之后预测的数据
"""
result.plot_all(names=None, index=None, conf_predicted=conf_predicted, conf_predict=conf, conf_alpha=0.05, result=further_predict_sampled, result_=predict_sampled)
plot_loss() 画 loss 曲线。plot() 画所有变量曲线。plot_confidence(index) 画变量为index的置信区间。文章来源:https://www.toymoban.com/news/detail-426163.html
可以重写画图,让图更加好看
# 你可以重写画图让图变得好看
class LSTMResult(LSTMResultBase):
def __init__(self, resultLSTM: Union[CustomLSTM, LSTMResultBase] = None):
super().__init__(resultLSTM)
def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
super().plot_confidence(index, alpha, conf_predicted)
def plot_loss(self):
pass
def plot(self, names=None):
pass
重新画图
# 当你重写 LSTMResult 后,可以进行如下操作
result = LSTMResult(result)
lstm = result.resultLSTM
result.plot()
保存数据和模型
# 保存模型
result.save()
加载保存数据
result = LSTMResultBase.load()
lstm = result.resultLSTM
整体代码
"""
File : main.py
Preferred Python IDE: Jupyter
"""
import numpy as np
from CustomModel import CustomLSTM
import Utils
def data_basic():
"""2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
return date, data
def data01():
# 测试二维数组多变量
data1 = np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
data2 = np.sin(np.arange(200) * np.pi / 25) + np.random.randn(200) * 0.1 + 3
return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]
def data02():
# 测试一维数组
return np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
def data03():
# 测试二维数组单变量
return data02()[:, np.newaxis]
# 加载数据,如果要使用 date,date 需要满足 excel 里面日期的格式或自行写代码设置时间戳
# date, data = data_basic()
data = data01()
# 初始化网络
window_size = 10
lstm = CustomLSTM(data, window_size, scale=False)
# lstm = CustomLSTM(data, window_size, date, scale=True)
lstm.init_lstm(hidden=64, num_layers=3)
# 训练网络
# max_batch 表示是否以整一个数据作为 batch 不做分割
_ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
# result = lstm.fit(num_epochs=1000, lr=0.01, batch_size=55, max_batch=False)
# 调整窗口大小重新训练
# lstm.slice(20)
# lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
# lstm.fit(num_epochs=50, batch_size=40, max_batch=False)
# 预测之前全部数据
predict = result.predicted(is_random=False)
# 打印 summary
r2, mse, rmse, mae, mape, smape = result.summary()
# 预测之后 n 步数据
n = 60
further_predict = result.predict(n)
# 指定 index 变量 n 步预测 1-alpha 置信区间
alpha = 0.05
conf_predicted = result.conf_int_predicted(alpha)
conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
# 画图
result.plot_loss()
result.plot()
result.plot_confidence(index=0, alpha=0.05, conf_predicted=None)
"""
File : CustomModel.py
"""
import numpy as np
import warnings
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted, check_array
from copy import deepcopy
from Model import LSTM
warnings.simplefilter("always")
# gpu or cpu
_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 添加可以按列 inverse 的函数
class MyScaler(MinMaxScaler):
def __init__(self):
super().__init__()
def inverse_transform_col(self, x, n_col):
check_is_fitted(self)
x = check_array(
x, copy=self.copy, dtype=(np.float64, np.float32, np.float16), force_all_finite="allow-nan"
)
x -= self.min_[n_col]
x /= self.scale_[n_col]
return x
class CustomLSTM:
X, Y, data, model, criterion, window, losses, scale = [None] * 8
def __init__(self, data: np.array, window, date=None, scale=False):
self.data = data
self.input_dim = self.init_data()
self.output_dim = self.input_dim
self.date = date
self.scale = scale
# 是否归一化
if scale:
self.scaler = MyScaler()
self.data = self.scaler.fit_transform(self.data)
self.slice(window)
# data 可为一维数组或二维数组,强制把一维数组转化为二维数组
def init_data(self):
assert (length := len(self.data.shape)) in [1, 2]
if length == 1:
self.data = self.data[:, np.newaxis]
return len(self.data[0])
# 已弃用下列函数,改为修改 h0, c0 保证不报错
# 检查总数据大小是否能整除 batch
def check_batch(self, batch_size):
length = self.X.shape[0]
# 保证 batch_size 比总长度小
assert length >= batch_size
if batch_size * (length // batch_size) != length:
warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning)
# 重置参数
def init_param(self):
self.model, self.criterion, self.losses = [None] * 3
# 以 window 为窗口大小切片形成整个batch
def slice(self, window):
self.init_param()
self.window = window
x, y = [], []
for i in range(len(self.data) - window):
x.append(self.data[i:i + window])
y.append(self.data[i + window])
x = np.array(x)
y = np.array(y)
x = torch.from_numpy(x).float() # (batch_size, sequence_length, input_size)
y = torch.from_numpy(y).float()
print(f"数据格式: X = {x.shape}, Y = {y.shape}")
self.X = x.to(_device)
self.Y = y.to(_device)
# 初始化 LSTM model
def init_lstm(self, hidden=64, num_layers=1):
self.init_param()
self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers).to(_device)
self.criterion = nn.MSELoss().to(_device)
# 总数据产生 batch 并可以进行 shuffle
@staticmethod
def iterate_batches(inputs, targets, batch_size, shuffle=True):
assert len(inputs) == len(targets)
indices = np.arange(n := len(inputs))
if shuffle:
np.random.shuffle(indices)
for start_idx in range(0, len(inputs) - batch_size + 1, batch_size):
excerpt = indices[start_idx:start_idx + batch_size]
yield inputs[excerpt], targets[excerpt], None
if (left := n % batch_size) != 0:
excerpt = indices[-left:]
yield inputs[excerpt], targets[excerpt], left
# 开始训练
def fit(self, num_epochs=100, lr=0.01, batch_size=128, max_batch=False, is_copy=False):
losses = []
avg_loss = 0
if self.model is None:
raise ValueError("请先使用CustomLSTM.init_lstm初始化网络")
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
self.model.train()
if max_batch:
batch_size = self.X.shape[0]
else:
pass
# self.check_batch(batch_size)
for epoch in range(num_epochs):
loss_all = 0
self.model.init_hidden(batch_size)
index = 0
for index, (batch_x, batch_y, left) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=False)):
optimizer.zero_grad()
outputs = self.model(batch_x, left)
loss = self.criterion(outputs, batch_y)
loss.backward()
optimizer.step()
loss_all += loss.detach().cpu()
losses.append(loss_all / (index + 1))
avg_loss += loss_all / (index + 1)
if epoch % 20 == 0:
print('Epoch [{}/{}], Loss: {:.4f}, AVG Loss of 20 sample: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1), avg_loss / 20))
avg_loss = 0
if self.losses is not None:
self.losses = self.losses + losses
else:
self.losses = losses
self.model.init_hidden(1)
from LSTMResult import LSTMResultBase
return LSTMResultBase(deepcopy(self) if is_copy else self)
"""
File : LSTMResult.py
"""
import Utils
import numpy as np
import warnings
import inspect
import torch
import matplotlib.pyplot as plt
import pickle
import time
from functools import cache
from RunTime import cal_time
import CustomModel
warnings.simplefilter("always")
# gpu or cpu
_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class LSTMResultBase(object):
# result 代表预测之后的数据的结果; result_ 代表预测已有真实值的数据的结果
# sampled result 比 result 更有说服力,克服了 Dropout 随机性问题,更贴近实际,更推荐使用 sampled result 数据。
result, conf, result_, conf_predicted, conf_predicted_sampled, sampled_result, sampled_result_ = [None] * 7
# 在内存中保存原始训练数据
def __init__(self, resultLSTM: CustomModel.CustomLSTM = None):
if isinstance(resultLSTM, CustomModel.CustomLSTM):
self.resultLSTM = resultLSTM
elif isinstance(resultLSTM, LSTMResultBase):
self.resultLSTM = resultLSTM.resultLSTM
self.copy_param(resultLSTM)
else:
warnings.warn(f'输入数据类型:{type(resultLSTM)},非标准数据,请输入 CustomLSTM or LSTMResultBase', DeprecationWarning)
self.__doc__ = resultLSTM.__doc__
def copy_param(self, parent):
self.result, self.conf, self.result_, self.conf_predicted, self.conf_predicted_sampled, self.sampled_result, self.sampled_result_ = \
parent.result, parent.conf, parent.result_, parent.conf_predicted, parent.conf_predicted_sampled, parent.sampled_result, parent.sampled_result_
# 获取变量,不采用拷贝方法
def __getattribute__(self, item):
get = lambda name: object.__getattribute__(self, name)
try:
return get(item)
except AttributeError:
pass
try:
results = get('resultLSTM')
except AttributeError:
raise ImportError("CustomLSTM load error")
try:
attribute = getattr(results, item)
if inspect.ismethod(attribute):
warnings.warn(f"调用{attribute},可能出错,请尽量不要在此调用 CustomLSTM", DeprecationWarning)
return attribute
except AttributeError:
AttributeError(f"{item} not in bot CustomLSTM or LSTMResult")
def __dir__(self):
return [x for x in dir(self.resultLSTM) if not inspect.ismethod(getattr(self.resultLSTM, x))] + dir(LSTMResultBase)
def __getstate__(self):
return self.__dict__
def __setstate__(self, dict_):
self.__dict__.update(dict_)
# 预测从 window_size 到结束数据的预测值,以便与真实值做比较
@cal_time()
def predicted(self, is_random=False):
if self.model is None:
raise ValueError("请先使用CustomLSTM.train")
if is_random:
self.model.train()
else:
self.model.eval()
self.model.init_hidden(len(self.X))
with torch.no_grad():
predicted = self.model(self.X).to(_device)
predicted = predicted.detach().cpu().numpy()
self.result_ = np.array(predicted)
if self.scale:
self.result_ = self.scaler.inverse_transform(self.result_)
return self.result_
# 预测之后 n 天的值
@cal_time()
def predict(self, n, is_random=False):
if self.model is None:
raise ValueError("请先使用CustomLSTM.train")
if is_random:
self.model.train()
else:
self.model.eval()
self.model.init_hidden(1)
_data = self.data[-self.window:, :].tolist()
y = []
with torch.no_grad():
for _ in range(n):
x = torch.tensor(_data).float().unsqueeze(0).to(_device)
_result = self.model(x).detach().cpu().tolist()
y.append(_result[0])
_data.append(_result[0])
_data.pop(0)
self.result = np.array(y)
if self.scale:
self.result = self.scaler.inverse_transform(self.result)
return self.result
# 保存整个模型
def save(self, name="./data"):
with open(name, 'wb') as f:
pickle.dump(self, f)
# 加载模型
@staticmethod
def load(name="./data"):
with open(name, "rb") as f:
return pickle.load(f)
# 标准计算已有真实数据的 conf
def conf_int_predicted(self, alpha=0.05):
if self.result_ is None:
raise ValueError("请先使用LSTMResultBase.predicted")
y_pred = self.result_
y_true = self.data[self.window:]
n_param = self.data.shape[1]
# 变量index,每个变量个数,(lower mean upper)
conf = np.zeros(shape=(n_param, len(y_true), 3))
for i in range(n_param):
conf[i] = Utils.ci(y_true[:, i], y_pred[:, i], alpha=alpha)
self.conf_predicted = conf
return conf
# 采用 sample 方法计算已有真实数据的置信区间,推荐使用该方法
@cache
def conf_int_predicted_sampled(self, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
result_ = np.copy(self.result_)
n = self.data.shape[0] - self.window
n_param = self.data.shape[1]
# 变量index,每个变量个数,(lower mean upper)
conf = np.zeros(shape=(n_param, n, 3))
sample = np.zeros(shape=(n_sample, n, n_param))
for i in range(n_sample):
print(f"[{i + 1}/{n_sample}] ", end="")
sample[i] = self.predicted(is_random=True)
for i in range(n_param):
conf[i] = method(sample[:, :, i], alpha=alpha)
# conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
# conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)
self.conf_predicted_sampled = conf
self.sampled_result_ = conf[:, :, 1].reshape(n_param, n).T
self.result_ = result_
return conf
# 预测数据置信区间,采用 sample 方法计算
@cache
def conf_int_sampled(self, n, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
result = np.copy(self.result)
n_param = self.data.shape[1]
sample = np.zeros(shape=(n_sample, n, n_param))
conf = np.zeros(shape=(n_param, n, 3))
for i in range(n_sample):
print(f"[{i + 1}/{n_sample}] ", end="")
sample[i] = self.predict(n, is_random=True)
for i in range(n_param):
conf[i] = method(sample[:, :, i], alpha=alpha)
# conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
# conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)
self.conf = conf
self.sampled_result = conf[:, :, 1].reshape(n_param, n).T
self.result = result
return conf
# 绘制 loss 曲线
def plot_loss(self):
if self.losses is None:
raise ValueError("loss 不存在,请先进行训练")
plt.figure(figsize=(12, 6))
plt.plot(self.losses)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()
def plot_all(self, names=None, index=None, conf_predicted=None, conf_predict=None, conf_alpha=None, result=None, result_=None):
if names is None:
names = [''] * len(self.data[0])
x = np.arange(len(self.data))
x_further = np.arange(len(self.data), len(self.data) + len(self.result))
if index is None:
index = range(len(self.data[0]))
elif isinstance(index, int):
index = [index]
conf_name = '{} Confidence interval' if conf_alpha is None else '{} ' + f'{1 - conf_alpha} Confidence interval'
plt.figure(figsize=(12, 6))
for i in index:
y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
plt.plot(x, y_true, label=f'{names[i]} True Values')
if result_ is not None:
plt.plot(x[self.window:], result_[:, i:i + 1], label=f'{names[i]} Predictions')
if result is not None:
plt.plot(x_further, result[:, i:i + 1], label=f"{names[i]} Further Predictions")
if conf_predicted is not None:
plt.fill_between(x[self.window:], conf_predicted[i, :, 0], conf_predicted[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
if conf_predict is not None:
plt.fill_between(x_further, conf_predict[i, :, 0], conf_predict[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
self.process_x_label()
plt.legend()
plt.show()
# 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字
def plot(self, names=None):
if self.result is None:
raise ValueError("请先使用LSTMResultBase.predict")
if self.result_ is None:
raise ValueError("请先使用LSTMResultBase.predicted")
if names is None:
names = [''] * len(self.data[0])
x = np.arange(len(self.data))
x_further = np.arange(len(self.data), len(self.data) + len(self.result))
plt.figure(figsize=(12, 6))
for i in range(len(self.data[0])):
y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
plt.plot(x, y_true, label=f'{names[i]} True Values')
plt.plot(x[self.window:], self.result_[:, i:i + 1], label=f'{names[i]} Predictions')
plt.plot(x_further, self.result[:, i:i + 1], label=f"{names[i]} Further Predictions")
self.process_x_label()
plt.legend()
plt.show()
def plot_sampled(self, names=None):
if self.sampled_result is None:
raise ValueError("请先使用LSTMResultBase.conf_int_sampled")
if self.sampled_result_ is None:
raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled")
if names is None:
names = [''] * len(self.data[0])
x = np.arange(len(self.data))
x_further = np.arange(len(self.data), len(self.data) + len(self.result))
plt.figure(figsize=(12, 6))
for i in range(len(self.data[0])):
y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
plt.plot(x, y_true, label=f'{names[0]} True Values')
plt.plot(x[self.window:], self.sampled_result_[:, i:i + 1], label=f'{names[0]} Predictions')
plt.plot(x_further, self.sampled_result[:, i:i + 1], label=f"{names[0]} Further Predictions")
self.process_x_label()
plt.legend()
plt.show()
# 把 x 轴设置为时间
def process_x_label(self):
if self.date is None:
return
begin = self.date[0]
end = self.date[-1]
between = (end - begin) // (len(self.date) - 1)
n = len(self.result)
length = len(self.data) + n
gap = length // 5
# "%Y-%m-%d %H:%M:%S"
plt.xticks(np.arange(0, length, gap), [time.strftime("%Y-%m-%d", time.localtime(i)) for i in range(begin, end + between * n + 1, between * gap)],
rotation=45)
plt.tight_layout()
# 对单个变量画置信区间
def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
if self.result is None:
raise ValueError("请先使用LSTMResultBase.predict")
if self.result_ is None:
raise ValueError("请先使用LSTMResultBase.predicted")
if conf_predicted is None:
try:
conf_predicted = self.conf_predicted_sampled[index]
except IndexError:
try:
conf_predicted = self.conf_predicted[index]
except IndexError:
raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled/conf_int_predicted")
else:
conf_predicted = conf_predicted[index]
try:
conf = self.conf[index]
except IndexError:
raise ValueError("请先使用LSTMResultBase.conf_int_predicted")
plt.figure(figsize=(12, 6))
x = np.arange(len(self.data))
n = len(self.result)
x_further = np.arange(len(self.data), len(self.data) + n)
y_true = self.data[:, index: index + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, index: index + 1], index)
plt.plot(x, y_true, label='True Values')
plt.plot(x[self.window:], conf_predicted[:, 1], label='Predictions')
plt.plot(x_further, conf[:, 1], label="Further Predictions")
plt.fill_between(x[self.window:], conf_predicted[:, 0], conf_predicted[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
plt.fill_between(x_further, conf[:, 0], conf[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
self.process_x_label()
plt.legend()
plt.show()
# 打印 summary, 即 r^2 评价函数等
def summary(self):
if self.result_ is None:
raise ValueError("请先使用LSTMResultBase.predicted")
data = self.data[self.window:, :]
if self.scale:
data = self.scaler.inverse_transform(data)
print("==========Summary Begin===========")
print("R2 =", score_r2 := Utils.r2(self.result_, data))
print("MSE =", score_mse := Utils.mse(self.result_, data))
print("RMSE =", score_rmse := np.sqrt(score_mse))
print("MAE =", score_mae := Utils.mae(self.result_, data))
print("MAPE =", score_mape := Utils.mape(self.result_, data, replace=self.scale))
print("SMAPE =", score_smape := Utils.smape(self.result_, data))
print(f"Average is {score_r2.mean()} {score_mse.mean()} {score_rmse.mean()} {score_mae.mean()} {score_mape.mean()} {score_smape.mean()}")
print("===========Summary end============")
return score_r2, score_mse, score_rmse, score_mae, score_mape, score_smape
"""
File : Model.py
"""
import torch
import torch.nn as nn
# gpu or cpu
_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 定义LSTM模型
class LSTM(nn.Module):
hidden = None
def __init__(self, input_size, hidden_size, output_size, num_layers):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# As batch_first=True, input: (batch_size, sequence_length, input_size)
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
self.out = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(hidden_size, output_size)
)
# 可选操作,可以把下一行注释
self.apply(LSTM.init_weights)
def forward(self, x, left=None):
self.lstm.flatten_parameters()
self.lstm2.flatten_parameters()
# 防止 loss.backward 报错
hidden = [each.data for each in self.hidden] if left is None else [each.data[:, :left, :] for each in self.hidden]
x, hidden = self.lstm(x, hidden)
x, self.hidden = self.lstm2(x, hidden)
x = x[:, -1, :]
x = self.out(x)
return x
# (h0, c0)
def init_hidden(self, batch_size):
weight = next(self.parameters()).data
self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device),
weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device))
@staticmethod
def init_weights(m):
if type(m) == nn.LSTM:
for name, param in m.named_parameters():
if 'weight_ih' in name:
torch.nn.init.orthogonal_(param.data)
elif 'weight_hh' in name:
torch.nn.init.orthogonal_(param.data)
elif 'bias' in name:
param.data.fill_(0)
elif type(m) == nn.Conv1d or type(m) == nn.Linear:
torch.nn.init.orthogonal_(m.weight)
m.bias.data.fill_(0)
"""
File : RunTime.py
"""
import time
from functools import wraps
# 用于计算函数运行时间,自定义修饰器,带有ms/s/min/hour/day单位
def cal_time(unit='s'):
units = ['ms', 's', 'min', 'hour', 'day']
times = [1000, 1, 1 / 60, 1 / 3600, 1 / 3600 / 24]
try:
time_process = times[units.index(unit.lower())]
except Exception as e:
raise e
def input_func(func):
@wraps(func)
def wrap(*args, **kwargs):
begin_time = time.time()
ans = func(*args, **kwargs)
print(func.__name__, f"用时 {(time.time() - begin_time) * time_process} {unit}")
return ans
return wrap
return input_func
"""
File : Utils.py
"""
import numpy as np
import pandas as pd
import scipy.stats as st
from typing import Union, List
# 置信区间
def ci(y_true: np.ndarray, y_pred: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
residuals = y_true - y_pred
n = len(residuals)
if use_t:
dist = st.t
df = n - 1
t_value = dist.ppf(1 - alpha / 2, df)
else:
dist = st.norm
t_value = dist.ppf(1 - alpha / 2)
if y_true.ndim == 1:
std_err = np.std(residuals, ddof=1) / np.sqrt(n)
else:
std_err = np.std(residuals, ddof=1, axis=0) / np.sqrt(n)
err = t_value * std_err
return np.c_[y_pred - err, y_pred, y_pred + err]
# sample 置信区间
def sample_ci(sample: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
n = len(sample)
if use_t:
dist = st.t
df = n - 1
t_value = dist.ppf(1 - alpha / 2, df)
else:
dist = st.norm
t_value = dist.ppf(1 - alpha / 2)
means = sample.mean(axis=0)
stds = sample.std(axis=0, ddof=1)
lower = means - t_value * stds / np.sqrt(n)
upper = means + t_value * stds / np.sqrt(n)
return np.c_[lower, means, upper]
# sample bootstrap 置信区间 sample: (n_sample, n)
def bootstrap_ci_sampled(sample: np.ndarray, alpha: float = 0.05, n_bootstraps: int = 1000) -> np.ndarray:
n = len(sample[0])
sample_size = len(sample)
lower = np.zeros(shape=(n,))
means = np.zeros(shape=(n,))
upper = np.zeros(shape=(n,))
for i in range(n):
bootstrap_sample = np.random.choice(sample[:, i], size=(sample_size, n_bootstraps), replace=True)
bootstrap_means = np.mean(bootstrap_sample, axis=1)
lower[i] = np.percentile(bootstrap_means, 100 * alpha / 2)
upper[i] = np.percentile(bootstrap_means, 100 * (1 - alpha / 2))
means[i] = np.mean(bootstrap_means)
return np.c_[lower, means, upper]
# 输入均为二维 numpy , 数据产生在列,输出一维 numpy 评价 value
def r2(y_pred, y_true):
return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0)
def mse(y_pred, y_true):
return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)
def rmse(y_pred, y_true):
return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred))
def mae(y_pred, y_true):
return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true)
def mape(y_pred, y_true, replace=False):
# 防止 y_true 含 0
if replace:
y_true = np.copy(y_true)
y_true[y_true == 0] = 1
return (np.abs((y_pred - y_true) / y_true)).mean(axis=0) * 100
def smape(y_pred, y_true):
return 2.0 * (np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true))).mean(axis=0) * 100
# 添加快捷打开文件操作
def openfile(name: str, data_col: Union[None, List[str], List[int]] = None, date_col: Union[None, int] = None) -> (np.ndarray, np.ndarray):
file_type = name.split(".")[-1]
if file_type == "csv":
df = pd.read_csv(name, encoding='GBK')
elif file_type == "xlsx" or file_type == "xls":
df = pd.read_excel(name)
else:
raise TypeError(f"{name} 类型不是 csv, xls, xlsx")
# df = df[["列名字1", "列名字2"]]
df_data = df.iloc[:, data_col] if data_col is not None else df
date = df.iloc[:, date_col].astype(
'int64') // 1e9 if date_col is not None else None
return np.array(date, dtype=int), np.array(df_data)
更新日志:
2023-4-10:修改了 bootstrap错误;优化了dir(LSTMResultBase);添加plot_all 自定义函数;修复了错误 warnings 不显示;对 sample 和 single 数据分离;取消 batch_size 检测改为修改 h0, c0;把 predicted 放入 resultbase 中;修复了 save 数据丢失问题。文章来源地址https://www.toymoban.com/news/detail-426163.html
到了这里,关于LSTM 易用代码 (pytorch)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!