transformer.py 10 KB


  1. import torch
  2. import torch.nn as nn
  3. import pandas as pd
  4. import numpy as np
  5. import matplotlib.pyplot as plt
  6. from sklearn.preprocessing import StandardScaler
  7. from sklearn.metrics import mean_squared_error
  8. from sklearn.model_selection import ParameterGrid
  9. import matplotlib
  10. import seaborn as sns
  11. from sklearn.metrics import mean_squared_error, mean_absolute_error
  12. from scipy.stats import shapiro
  13. from statsmodels.graphics.tsaplots import plot_acf
  14. from scipy.stats import probplot
  15. from statsmodels.tsa.stattools import adfuller
  16. # 设置中文字体
  17. matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
  18. matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
  19. # 数据预处理
  20. data = pd.read_excel('./daily_flu_counts.xlsx')
  21. data = data[['日期', '发病数']].sort_values('日期').fillna(0)
  22. # 使用线性插值填充缺失值
  23. data['发病数'] = data['发病数'].interpolate(method='linear')
  24. # 确保数据非负
  25. data['发病数'] = data['发病数'].clip(lower=0)
  26. # 确保数据非负
  27. data['发病数'] = data['发病数'].clip(lower=0)
  28. # 确保 '日期' 列为 datetime 类型
  29. data['日期'] = pd.to_datetime(data['日期'])
  30. # 绘制原数据的曲线图
  31. plt.figure(figsize=(12, 6))
  32. plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')
  33. plt.title('每日发病数曲线图')
  34. plt.xlabel('日期')
  35. plt.ylabel('发病数')
  36. plt.legend()
  37. plt.grid(True)
  38. plt.xticks(rotation=45)
  39. plt.show()
  40. # 检查是否有可用的 GPU
  41. device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
  42. print(f'Using device: {device}')
  43. # 读取数据
  44. data = pd.read_excel('./daily_flu_counts.xlsx')
  45. data = data[['日期', '发病数']].sort_values('日期').fillna(0)
  46. # 标准化数据
  47. scaler = StandardScaler()
  48. scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))
  49. # 创建时间序列数据集
  50. def create_dataset(dataset, look_back=1):
  51. X, Y = [], []
  52. for i in range(len(dataset) - look_back):
  53. X.append(dataset[i:(i + look_back), 0])
  54. Y.append(dataset[i + look_back, 0])
  55. return np.array(X), np.array(Y)
  56. look_back = 30 # 滑动窗口长度
  57. X, Y = create_dataset(scaled_data, look_back)
  58. # 拆分训练集和测试集
  59. train_size = int(len(X) * 0.8)
  60. X_train, X_test = X[:train_size], X[train_size:]
  61. Y_train, Y_test = Y[:train_size], Y[train_size:]
  62. # 转换为 PyTorch 张量
  63. X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
  64. X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
  65. Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
  66. Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)
  67. # 定义位置编码
  68. class PositionalEncoding(nn.Module):
  69. def __init__(self, d_model, max_len=5000):
  70. super(PositionalEncoding, self).__init__()
  71. pe = torch.zeros(max_len, d_model)
  72. position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
  73. div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
  74. pe[:, 0::2] = torch.sin(position * div_term)
  75. pe[:, 1::2] = torch.cos(position * div_term)
  76. pe = pe.unsqueeze(0) # [1, max_len, d_model]
  77. self.register_buffer('pe', pe)
  78. def forward(self, x):
  79. """
  80. x: Tensor, shape [batch_size, seq_len, d_model]
  81. """
  82. x = x + self.pe[:, :x.size(1), :]
  83. return x
  84. # 定义 Transformer 模型
  85. class EnhancedTransformerModel(nn.Module):
  86. def __init__(self, input_size=1, d_model=64, nhead=8, num_encoder_layers=3, dim_feedforward=256, output_size=1, dropout=0.1):
  87. super(EnhancedTransformerModel, self).__init__()
  88. self.input_linear = nn.Linear(input_size, d_model)
  89. self.positional_encoding = PositionalEncoding(d_model)
  90. encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
  91. dim_feedforward=dim_feedforward, dropout=dropout)
  92. self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
  93. self.fc1 = nn.Linear(d_model, 64)
  94. self.relu = nn.ReLU()
  95. self.fc2 = nn.Linear(64, output_size)
  96. def forward(self, x):
  97. """
  98. x: Tensor, shape [batch_size, seq_len, input_size]
  99. """
  100. x = self.input_linear(x) # [batch_size, seq_len, d_model]
  101. x = self.positional_encoding(x) # 添加位置编码
  102. x = x.transpose(0, 1) # [seq_len, batch_size, d_model]
  103. x = self.transformer_encoder(x) # [seq_len, batch_size, d_model]
  104. x = x.mean(dim=0) # [batch_size, d_model]
  105. x = self.fc1(x)
  106. x = self.relu(x)
  107. x = self.fc2(x)
  108. return x
  109. # 超参数搜索的网格(移除 batch_size)
  110. param_grid = {
  111. 'd_model': [64, 128],
  112. 'nhead': [4, 8],
  113. 'num_encoder_layers': [2, 3],
  114. 'dim_feedforward': [256, 512],
  115. 'learning_rate': [0.0001, 0.0005, 0.001]
  116. }
  117. # 评估函数(移除 batch_size)
  118. def evaluate_model(d_model, nhead, num_encoder_layers, dim_feedforward, learning_rate):
  119. # 初始化模型
  120. model = EnhancedTransformerModel(
  121. input_size=1,
  122. d_model=d_model,
  123. nhead=nhead,
  124. num_encoder_layers=num_encoder_layers,
  125. dim_feedforward=dim_feedforward
  126. ).to(device)
  127. loss_function = nn.MSELoss()
  128. optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
  129. # 训练模型
  130. epochs = 100
  131. model.train()
  132. for epoch in range(epochs):
  133. optimizer.zero_grad()
  134. y_pred = model(X_train)
  135. loss = loss_function(y_pred, Y_train)
  136. loss.backward()
  137. optimizer.step()
  138. # 预测和计算损失
  139. model.eval()
  140. with torch.no_grad():
  141. test_predictions = model(X_test).cpu().numpy()
  142. test_predictions_scaled = scaler.inverse_transform(test_predictions)
  143. Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
  144. mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
  145. return mse
  146. # 网格搜索
  147. best_params = None
  148. best_mse = float('inf')
  149. for params in ParameterGrid(param_grid):
  150. print(f'Evaluating with parameters: {params}')
  151. mse = evaluate_model(**params)
  152. print(f'MSE: {mse}')
  153. if mse < best_mse:
  154. best_mse = mse
  155. best_params = params
  156. print(f'Best parameters: {best_params}')
  157. print(f'Best MSE: {best_mse}')
  158. # 使用最佳参数实例化模型
  159. model = EnhancedTransformerModel(
  160. input_size=1,
  161. d_model=best_params['d_model'],
  162. nhead=best_params['nhead'],
  163. num_encoder_layers=best_params['num_encoder_layers'],
  164. dim_feedforward=best_params['dim_feedforward']
  165. ).to(device)
  166. # 定义损失函数和优化器
  167. loss_function = nn.MSELoss()
  168. optimizer = torch.optim.Adam(model.parameters(), lr=best_params['learning_rate'])
  169. # 训练模型
  170. epochs = 500
  171. train_losses = []
  172. for epoch in range(epochs):
  173. model.train()
  174. optimizer.zero_grad()
  175. y_pred = model(X_train)
  176. loss = loss_function(y_pred, Y_train)
  177. loss.backward()
  178. optimizer.step()
  179. train_losses.append(loss.item())
  180. if epoch % 50 == 0:
  181. print(f'Epoch {epoch}, Loss: {loss.item()}')
  182. # 绘制训练损失
  183. plt.plot(train_losses, label="训练损失")
  184. plt.xlabel("Epochs")
  185. plt.ylabel("Loss")
  186. plt.legend()
  187. plt.show()
  188. # 测试集预测
  189. model.eval()
  190. with torch.no_grad():
  191. test_predictions = model(X_test).cpu().numpy()
  192. # 逆归一化
  193. test_predictions_scaled = scaler.inverse_transform(test_predictions)
  194. Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
  195. # 确保预测值不小于0
  196. test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)
  197. # 计算残差并中心化
  198. residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
  199. residuals_centered = residuals - residuals.mean()
  200. # Bootstrap参数
  201. n_bootstrap = 1000 # 重采样次数
  202. bootstrap_predictions = []
  203. np.random.seed(42) # 为了结果可复现
  204. for _ in range(n_bootstrap):
  205. # 从中心化的残差中有放回地采样
  206. sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
  207. # 生成新的预测样本
  208. bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
  209. # 确保预测值不小于0
  210. bootstrap_pred = np.clip(bootstrap_pred, 0, None)
  211. bootstrap_predictions.append(bootstrap_pred)
  212. bootstrap_predictions = np.array(bootstrap_predictions)
  213. # 计算置信区间(例如 95%)
  214. lower_percentile = 2.5
  215. upper_percentile = 97.5
  216. ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
  217. ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)
  218. # 预测未来的步骤数(例如 30 天)
  219. future_steps = 30
  220. future_predictions = []
  221. last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
  222. last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)
  223. model.eval()
  224. with torch.no_grad():
  225. for _ in range(future_steps):
  226. pred_scaled = model(last_sequence).cpu().numpy()
  227. future_predictions.append(pred_scaled.flatten()[0])
  228. # 更新序列:移除第一个元素,添加新的预测
  229. new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
  230. last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)
  231. # 逆归一化
  232. future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
  233. future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)
  234. # Bootstrap 未来预测置信区间
  235. bootstrap_future_predictions = []
  236. for _ in range(n_bootstrap):
  237. # 假设未来预测的残差分布与测试集相似
  238. sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
  239. bootstrap_future_pred = future_predictions_scaled + sampled_residuals
  240. # 确保预测值不小于0
  241. bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
  242. bootstrap_future_predictions.append(bootstrap_future_pred)
  243. bootstrap_future_predictions = np.array(bootstrap_future_predictions)
  244. # 计算置信区间(例如 95%)
  245. ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
  246. ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)
  247. # 生成未来的日期
  248. last_date = data['日期'].iloc[-1]
  249. # 确保 last_date 是 pandas Timestamp 对象
  250. if not isinstance(last_date, pd.Timestamp):
  251. last_date = pd.to_datetime(last_date)
  252. # 使用 start 和 periods 生成未来的日期,不使用 closed 参数
  253. future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)