# 时序预测实践 **Repository Path**: encorew/time-series-practice ## Basic Information - **Project Name**: 时序预测实践 - **Description**: 时间序列预测 ---使用LSTM和Transformer模型进行时序预测 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 40 - **Forks**: 10 - **Created**: 2023-07-04 - **Last Updated**: 2025-07-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 时间序列预测实践 ## RNN & LSTM 预测 ### 1. RNN&LSTM模型结构定义 #### RNN结构 image-20230704152304997 * RNN的模块在pytorch框架中已经实现,我们仅需添加一个线性层**Linear**来表示**V** ```python # RNN类定义 class RNN(nn.Module): def __init__(self, series_dim, hidden_size, num_layers): super(RNN, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.rnn = nn.RNN(series_dim, hidden_size, num_layers, batch_first=True) # rnn使用pytorch自带的RNN模块 self.linear = nn.Linear(hidden_size, series_dim) # 对隐藏状态添加线性层来得到最终输出 def __call__(self, input, state): hiddens, _ = self.rnn(input, state) # hiddens[0] = f (U · input[0] + W · state) # hiddens[1] = f (U · input[1] + W · hidden[0]) # hiddens[2] = f (U · input[2] + W · hidden[1]) # ... # hiddens[t] = f (U · input[t] + W · hidden[t-1]) return self.linear(hiddens) # 最终返回了预测值数组o,表示了时刻0至时刻t的所有预测值 # :其中o[t] = V · hiddens[t],可以与真实值y比较来进行反向传播 def init_state(self, batch_size): return torch.zeros((self.num_layers, batch_size, self.hidden_size)) # 用于训练时初始化模型的状态hidden[0] ``` #### LSTM结构 image-20230704160535131 * 我们不需要去自行实现LSTM的遗忘机制,pytorch已帮我们实现完好 * 比起RNN,仅需在 `init_hidden` 方法中添加向量**c**即可 ```python # LSTM类定义 class LSTM(nn.Module): def __init__(self, series_dim, hidden_size, num_layers): super(LSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(series_dim, hidden_size, num_layers, batch_first=True) # 与RNN相同,调用pytorch封装好的LSTM,自行再添加一个线性层即可 self.linear = nn.Linear(hidden_size, series_dim) def __call__(self, input, state): # 计算过程与RNN的逻辑相同 hiddens, _ = self.lstm(input, state) return self.linear(hiddens) def init_state(self, batch_size): # 与RNN的状态不同,由h和c构成的状态pair构成 h = torch.zeros(self.num_layers, batch_size, self.hidden_size, requires_grad=True) c = torch.zeros(self.num_layers, batch_size, self.hidden_size, requires_grad=True) return (h, c) ``` ### 2. 数据的读取 image-20230704154845211 * 数据集选取自数据的前10000个时间戳 ```python # 1. 读取csv表格数据 data = pd.read_csv('data/pollution.csv').values[0:10000] # 2. 选取 pollution,dew,temp,press作为我们要学习和预测的数据 data = data[:, [1, 2, 3, 4]] plt.plot(data) plt.show() ``` * 构建样本和标签 ```python # 3. 分开样本和标签 # 样本:(x_0,x_1,...x_n-1) -> 标签:(x_1,x_2,...x_n), 样本:(x_1,x_2,...x_n) -> 标签:(x_2,x_3,...x_n+1) ,... WINDOW_SIZE = 50 samples = [] labels = [] for i in range(len(data) - WINDOW_SIZE - 1): samples.append(data[i:i + WINDOW_SIZE]) labels.append(data[i + 1:i + WINDOW_SIZE + 1]) samples = torch.tensor(np.array(samples), dtype=torch.float32) labels = torch.tensor(np.array(labels), dtype=torch.float32) ``` image-20230704170336239 * 划分训练集和测试集,使用`DataLoader`来创建数据集 ```python # 4. 划分训练集,测试集 按4:1划分 train_test_boundary = int(len(data) * 0.8) train_samples = samples[:train_test_boundary, :] train_labels = labels[:train_test_boundary, :] test_samples = samples[train_test_boundary:, :] # 5. 使用pytorch的dataloader构建数据集 BATCH_SIZE = 32 train_dataset = Data.TensorDataset(train_samples, train_labels) train_loader = Data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True) ``` ### 3. 模型的训练 * 实例化定义好的LSTM类并定义损失函数和优化器等,帮助我们进行模型的更新 ```python # 数据已准备好,开始训练 # 6. 实例化模型,这里使用LSTM作为我们的模型(使用RNN也可以) TIME_SERIES_DIM = 4 HIDDEN_SIZE = 60 NUM_LAYERS = 1 model = LSTM(TIME_SERIES_DIM, HIDDEN_SIZE, NUM_LAYERS) # 7. 定义损失函数和优化器等 LEARNING_RATE = 0.001 loss_function = nn.MSELoss() updater = torch.optim.Adam(model.parameters(), LEARNING_RATE) ``` * 开始训练 ```python # 8. 开始训练 EPOCH = 10 for epoch in range(EPOCH): current_epoch_loss = 0 batches_per_epoch = 0 for X, Y in train_loader: # 循环神经网络初始化状态h state = model.init_state(BATCH_SIZE) # 把当前状态state和当前输入X输入进模型 Y_pred = model(X, state) # 通过损失函数和优化器来更新 updater.zero_grad() loss = loss_function(Y_pred, Y) current_epoch_loss += float(loss) * BATCH_SIZE batches_per_epoch += BATCH_SIZE loss.backward() updater.step() # 梯度下降更新权重 print(f"*Current epoch:{epoch} training loss:{current_epoch_loss / batches_per_epoch}") ``` ### 4. 预测 image-20230704171511751 * 在测试集上进行预测时选取每次预测出的向量的**最后一个点** (即图中绿色花括号中的绿色点) * e.g. [0,1,2,3] 预测出 [1,2,3,4] **=>** 我们仅需要4作为最终结果,故选取最后的一个点加入predicted_data列表 * 测试时**batch_size**设置为**1**,需要取predicted_label[0],且需要进行detach并转成numpy 综上,即`predicted_label[0][-1]`,代码如下: ```python # 9. 预测 predicted_data = [] for test_sample in test_samples: state = model.init_state(batch_size=1) predicted_label = model(test_sample.reshape(1, test_sample.shape[0], -1), state) predicted_data.append(predicted_label[0][-1].detach().numpy()) predicted_data = np.array(predicted_data) ``` **画图** ```python # 10. 计算误差并绘制预测结果 true_data = data[train_test_boundary + WINDOW_SIZE + 1:] MSE = mean_squared_error(true_data, predicted_data) MAE = mean_absolute_error(true_data, predicted_data) print(f"Mean squared error:{round(MSE, 5)} Mean absolute error:{round(MAE, 5)}") plt.plot(true_data[:, 0]) plt.plot(predicted_data[:, 0]) plt.show() ``` ## Transformer预测 ### 1. Transformer结构 ![image-20230706130608372](README.assets/image-20230706130608372.png) * 使用 Encoder Decoder 结构 * 首先定义Positional Encoding ```python # 位置编码 class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = 1 / (10000 ** ((2 * np.arange(d_model)) / d_model)) # 通过sin和cos定义positional encoding pe[:, 0::2] = torch.sin(position * div_term[0::2]) pe[:, 1::2] = torch.cos(position * div_term[1::2]) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): # repeat的作用:对于repeat(x,y,z),把参数通道数复制x遍,行复制y遍,列复制z遍 return x + self.pe[:x.size(0), :].repeat(1, x.shape[1], 1) ``` * Transformer中Encoder和Decoder的定义(调用nn的`TransformerEncoderLayer`等模块来定义即可) ```python class TransAm(nn.Module): def __init__(self, series_dim, feature_size=250, num_encoder_layers=1, num_decoder_layers=1, dropout=0.1): super(TransAm, self).__init__() self.model_type = 'Transformer' self.input_embedding = nn.Linear(series_dim, feature_size) self.src_mask = None # 调用先前定义的PositionalEncoding类,定义位置编码层 self.pos_encoder = PositionalEncoding(feature_size) self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout) self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_encoder_layers) self.decoder_layer = nn.TransformerDecoderLayer(d_model=feature_size, nhead=10, dropout=dropout) self.decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_decoder_layers) # 使用pytorch的transformer,调用其中的Encoder和Decoder self.linear = nn.Linear(feature_size, series_dim) ``` * ```python def forward(self, src, tgt): if self.src_mask is None or self.src_mask.size(0) != len(src): device = src.device mask = self._generate_square_subsequent_mask(len(src)).to(device) self.src_mask = mask src = self.input_embedding(src) # linear transformation before positional embedding src = self.pos_encoder(src) # 对encoder输入进行位置编码,输入encoder,得到memory memory = self.encoder(src, self.src_mask) # 对decoder输入进行位置编码,和memory一起输入给decoder tgt = self.input_embedding(tgt) tgt = self.pos_encoder(tgt) output = self.decoder(tgt, memory) # 最后结果连一个线性层 output = self.linear(output) # 结果抛去起始部分,只把预测部分提取出来 return output[-PREDICT_SIZE:, :, :] ``` 前向传播得到输出 ### 2. 训练过程 * 数据集构建 ```python # 构建样本与标签 WINDOW_SIZE = 96 LABEL_SIZE = 48 PREDICT_SIZE = 96 samples = [] labels = [] for i in range(len(data) - WINDOW_SIZE - PREDICT_SIZE): samples.append(data[i:i + WINDOW_SIZE]) labels.append(data[i + WINDOW_SIZE - LABEL_SIZE:i + WINDOW_SIZE + PREDICT_SIZE]) samples = torch.tensor(np.array(samples), dtype=torch.float32) labels = torch.tensor(np.array(labels), dtype=torch.float32) ``` image-20230706131124989 用前96个点去预测包括重叠的48+96=144个点 * 训练 ```python # 开始训练 def train(): for epoch in range(EPOCH): current_epoch_loss = 0 batches_per_epoch = 0 for X, Y in train_loader: # 把数据的第0维和第1维转一下,即把输入和输出转为 (sequence_length, batch_size, series_dim)的格式 X, Y = X.permute(1, 0, 2), Y.permute(1, 0, 2) # 为TransFormer构建decoder的输入 decoder_input = torch.zeros_like(Y[-PREDICT_SIZE:, :, :]).float() decoder_input = torch.cat([Y[:LABEL_SIZE, :, ], decoder_input], dim=0).float().to(DEVICE) # *** 把encoder的输入:X和decoder的输入:decoder_input投喂给模型 Y_pred = model(X, decoder_input) # 通过损失函数和优化器来更新 updater.zero_grad() loss = loss_function(Y_pred, Y[-PREDICT_SIZE:, :, :]) current_epoch_loss += float(loss) * BATCH_SIZE batches_per_epoch += BATCH_SIZE loss.backward() updater.step() # 梯度下降更新权重 print(f"*Current epoch:{epoch} training loss:{current_epoch_loss / batches_per_epoch}") ``` decoder的输入为重叠的48个点与96个为0的点的拼接 image-20230706131424637 * 输出结果 ​ image-20230706132456426