# 时序预测实践
**Repository Path**: chuchu01/time-series-practice
## Basic Information
- **Project Name**: 时序预测实践
- **Description**: 时间序列预测
---使用LSTM和Transformer模型进行时序预测
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 10
- **Created**: 2023-07-19
- **Last Updated**: 2023-07-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 时间序列预测实践
## RNN & LSTM 预测
### 1. RNN&LSTM模型结构定义
#### RNN结构
* RNN的模块在pytorch框架中已经实现,我们仅需添加一个线性层**Linear**来表示**V**
```python
# RNN类定义
class RNN(nn.Module):
def __init__(self, series_dim, hidden_size, num_layers):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.rnn = nn.RNN(series_dim, hidden_size, num_layers, batch_first=True)
# rnn使用pytorch自带的RNN模块
self.linear = nn.Linear(hidden_size, series_dim)
# 对隐藏状态添加线性层来得到最终输出
def __call__(self, input, state):
hiddens, _ = self.rnn(input, state)
# hiddens[0] = f (U · input[0] + W · state)
# hiddens[1] = f (U · input[1] + W · hidden[0])
# hiddens[2] = f (U · input[2] + W · hidden[1])
# ...
# hiddens[t] = f (U · input[t] + W · hidden[t-1])
return self.linear(hiddens)
# 最终返回了预测值数组o,表示了时刻0至时刻t的所有预测值
# :其中o[t] = V · hiddens[t],可以与真实值y比较来进行反向传播
def init_state(self, batch_size):
return torch.zeros((self.num_layers, batch_size, self.hidden_size))
# 用于训练时初始化模型的状态hidden[0]
```
#### LSTM结构
* 我们不需要去自行实现LSTM的遗忘机制,pytorch已帮我们实现完好
* 比起RNN,仅需在 `init_hidden` 方法中添加向量**c**即可
```python
# LSTM类定义
class LSTM(nn.Module):
def __init__(self, series_dim, hidden_size, num_layers):
super(LSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(series_dim, hidden_size, num_layers, batch_first=True)
# 与RNN相同,调用pytorch封装好的LSTM,自行再添加一个线性层即可
self.linear = nn.Linear(hidden_size, series_dim)
def __call__(self, input, state):
# 计算过程与RNN的逻辑相同
hiddens, _ = self.lstm(input, state)
return self.linear(hiddens)
def init_state(self, batch_size):
# 与RNN的状态不同,由h和c构成的状态pair构成
h = torch.zeros(self.num_layers, batch_size,
self.hidden_size, requires_grad=True)
c = torch.zeros(self.num_layers, batch_size,
self.hidden_size, requires_grad=True)
return (h, c)
```
### 2. 数据的读取
* 数据集选取自数据的前10000个时间戳
```python
# 1. 读取csv表格数据
data = pd.read_csv('data/pollution.csv').values[0:10000]
# 2. 选取 pollution,dew,temp,press作为我们要学习和预测的数据
data = data[:, [1, 2, 3, 4]]
plt.plot(data)
plt.show()
```
* 构建样本和标签
```python
# 3. 分开样本和标签
# 样本:(x_0,x_1,...x_n-1) -> 标签:(x_1,x_2,...x_n), 样本:(x_1,x_2,...x_n) -> 标签:(x_2,x_3,...x_n+1) ,...
WINDOW_SIZE = 50
samples = []
labels = []
for i in range(len(data) - WINDOW_SIZE - 1):
samples.append(data[i:i + WINDOW_SIZE])
labels.append(data[i + 1:i + WINDOW_SIZE + 1])
samples = torch.tensor(np.array(samples), dtype=torch.float32)
labels = torch.tensor(np.array(labels), dtype=torch.float32)
```
* 划分训练集和测试集,使用`DataLoader`来创建数据集
```python
# 4. 划分训练集,测试集 按4:1划分
train_test_boundary = int(len(data) * 0.8)
train_samples = samples[:train_test_boundary, :]
train_labels = labels[:train_test_boundary, :]
test_samples = samples[train_test_boundary:, :]
# 5. 使用pytorch的dataloader构建数据集
BATCH_SIZE = 32
train_dataset = Data.TensorDataset(train_samples, train_labels)
train_loader = Data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
```
### 3. 模型的训练
* 实例化定义好的LSTM类并定义损失函数和优化器等,帮助我们进行模型的更新
```python
# 数据已准备好,开始训练
# 6. 实例化模型,这里使用LSTM作为我们的模型(使用RNN也可以)
TIME_SERIES_DIM = 4
HIDDEN_SIZE = 60
NUM_LAYERS = 1
model = LSTM(TIME_SERIES_DIM, HIDDEN_SIZE, NUM_LAYERS)
# 7. 定义损失函数和优化器等
LEARNING_RATE = 0.001
loss_function = nn.MSELoss()
updater = torch.optim.Adam(model.parameters(), LEARNING_RATE)
```
* 开始训练
```python
# 8. 开始训练
EPOCH = 10
for epoch in range(EPOCH):
current_epoch_loss = 0
batches_per_epoch = 0
for X, Y in train_loader:
# 循环神经网络初始化状态h
state = model.init_state(BATCH_SIZE)
# 把当前状态state和当前输入X输入进模型
Y_pred = model(X, state)
# 通过损失函数和优化器来更新
updater.zero_grad()
loss = loss_function(Y_pred, Y)
current_epoch_loss += float(loss) * BATCH_SIZE
batches_per_epoch += BATCH_SIZE
loss.backward()
updater.step() # 梯度下降更新权重
print(f"*Current epoch:{epoch} training loss:{current_epoch_loss / batches_per_epoch}")
```
### 4. 预测
* 在测试集上进行预测时选取每次预测出的向量的**最后一个点** (即图中绿色花括号中的绿色点)
* e.g. [0,1,2,3] 预测出 [1,2,3,4] **=>** 我们仅需要4作为最终结果,故选取最后的一个点加入predicted_data列表
* 测试时**batch_size**设置为**1**,需要取predicted_label[0],且需要进行detach并转成numpy
综上,即`predicted_label[0][-1]`,代码如下:
```python
# 9. 预测
predicted_data = []
for test_sample in test_samples:
state = model.init_state(batch_size=1)
predicted_label = model(test_sample.reshape(1, test_sample.shape[0], -1), state)
predicted_data.append(predicted_label[0][-1].detach().numpy())
predicted_data = np.array(predicted_data)
```
**画图**
```python
# 10. 计算误差并绘制预测结果
true_data = data[train_test_boundary + WINDOW_SIZE + 1:]
MSE = mean_squared_error(true_data, predicted_data)
MAE = mean_absolute_error(true_data, predicted_data)
print(f"Mean squared error:{round(MSE, 5)} Mean absolute error:{round(MAE, 5)}")
plt.plot(true_data[:, 0])
plt.plot(predicted_data[:, 0])
plt.show()
```
## Transformer预测
### 1. Transformer结构

* 使用 Encoder Decoder 结构
* 首先定义Positional Encoding
```python
# 位置编码
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = 1 / (10000 ** ((2 * np.arange(d_model)) / d_model))
# 通过sin和cos定义positional encoding
pe[:, 0::2] = torch.sin(position * div_term[0::2])
pe[:, 1::2] = torch.cos(position * div_term[1::2])
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
# repeat的作用:对于repeat(x,y,z),把参数通道数复制x遍,行复制y遍,列复制z遍
return x + self.pe[:x.size(0), :].repeat(1, x.shape[1], 1)
```
* Transformer中Encoder和Decoder的定义(调用nn的`TransformerEncoderLayer`等模块来定义即可)
```python
class TransAm(nn.Module):
def __init__(self, series_dim, feature_size=250, num_encoder_layers=1, num_decoder_layers=1, dropout=0.1):
super(TransAm, self).__init__()
self.model_type = 'Transformer'
self.input_embedding = nn.Linear(series_dim, feature_size)
self.src_mask = None
# 调用先前定义的PositionalEncoding类,定义位置编码层
self.pos_encoder = PositionalEncoding(feature_size)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)
self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_encoder_layers)
self.decoder_layer = nn.TransformerDecoderLayer(d_model=feature_size, nhead=10, dropout=dropout)
self.decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_decoder_layers)
# 使用pytorch的transformer,调用其中的Encoder和Decoder
self.linear = nn.Linear(feature_size, series_dim)
```
* ```python
def forward(self, src, tgt):
if self.src_mask is None or self.src_mask.size(0) != len(src):
device = src.device
mask = self._generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask
src = self.input_embedding(src) # linear transformation before positional embedding
src = self.pos_encoder(src)
# 对encoder输入进行位置编码,输入encoder,得到memory
memory = self.encoder(src, self.src_mask)
# 对decoder输入进行位置编码,和memory一起输入给decoder
tgt = self.input_embedding(tgt)
tgt = self.pos_encoder(tgt)
output = self.decoder(tgt, memory)
# 最后结果连一个线性层
output = self.linear(output)
# 结果抛去起始部分,只把预测部分提取出来
return output[-PREDICT_SIZE:, :, :]
```
前向传播得到输出
### 2. 训练过程
* 数据集构建
```python
# 构建样本与标签
WINDOW_SIZE = 96
LABEL_SIZE = 48
PREDICT_SIZE = 96
samples = []
labels = []
for i in range(len(data) - WINDOW_SIZE - PREDICT_SIZE):
samples.append(data[i:i + WINDOW_SIZE])
labels.append(data[i + WINDOW_SIZE - LABEL_SIZE:i + WINDOW_SIZE + PREDICT_SIZE])
samples = torch.tensor(np.array(samples), dtype=torch.float32)
labels = torch.tensor(np.array(labels), dtype=torch.float32)
```
用前96个点去预测包括重叠的48+96=144个点
* 训练
```python
# 开始训练
def train():
for epoch in range(EPOCH):
current_epoch_loss = 0
batches_per_epoch = 0
for X, Y in train_loader:
# 把数据的第0维和第1维转一下,即把输入和输出转为 (sequence_length, batch_size, series_dim)的格式
X, Y = X.permute(1, 0, 2), Y.permute(1, 0, 2)
# 为TransFormer构建decoder的输入
decoder_input = torch.zeros_like(Y[-PREDICT_SIZE:, :, :]).float()
decoder_input = torch.cat([Y[:LABEL_SIZE, :, ], decoder_input], dim=0).float().to(DEVICE)
# *** 把encoder的输入:X和decoder的输入:decoder_input投喂给模型
Y_pred = model(X, decoder_input)
# 通过损失函数和优化器来更新
updater.zero_grad()
loss = loss_function(Y_pred, Y[-PREDICT_SIZE:, :, :])
current_epoch_loss += float(loss) * BATCH_SIZE
batches_per_epoch += BATCH_SIZE
loss.backward()
updater.step() # 梯度下降更新权重
print(f"*Current epoch:{epoch} training loss:{current_epoch_loss / batches_per_epoch}")
```
decoder的输入为重叠的48个点与96个为0的点的拼接
* 输出结果
