|
@@ -0,0 +1,310 @@
|
|
|
+import torch
|
|
|
+import torch.nn as nn
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+import matplotlib.pyplot as plt
|
|
|
+from sklearn.preprocessing import StandardScaler
|
|
|
+from sklearn.metrics import mean_squared_error
|
|
|
+from sklearn.model_selection import ParameterGrid
|
|
|
+import matplotlib
|
|
|
+import seaborn as sns
|
|
|
+from sklearn.metrics import mean_squared_error, mean_absolute_error
|
|
|
+from scipy.stats import shapiro
|
|
|
+from statsmodels.graphics.tsaplots import plot_acf
|
|
|
+from scipy.stats import probplot
|
|
|
+from statsmodels.tsa.stattools import adfuller
|
|
|
+
|
|
|
+# 设置中文字体
|
|
|
+matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
|
|
|
+matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
|
|
|
+
|
|
|
+# 数据预处理
|
|
|
+data = pd.read_excel('./daily_flu_counts.xlsx')
|
|
|
+data = data[['日期', '发病数']].sort_values('日期').fillna(0)
|
|
|
+
|
|
|
+
|
|
|
+# 使用线性插值填充缺失值
|
|
|
+data['发病数'] = data['发病数'].interpolate(method='linear')
|
|
|
+# 确保数据非负
|
|
|
+data['发病数'] = data['发病数'].clip(lower=0)
|
|
|
+# 确保数据非负
|
|
|
+data['发病数'] = data['发病数'].clip(lower=0)
|
|
|
+# 确保 '日期' 列为 datetime 类型
|
|
|
+data['日期'] = pd.to_datetime(data['日期'])
|
|
|
+
|
|
|
+# 绘制原数据的曲线图
|
|
|
+plt.figure(figsize=(12, 6))
|
|
|
+plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')
|
|
|
+
|
|
|
+
|
|
|
+plt.title('每日发病数曲线图')
|
|
|
+plt.xlabel('日期')
|
|
|
+plt.ylabel('发病数')
|
|
|
+plt.legend()
|
|
|
+plt.grid(True)
|
|
|
+plt.xticks(rotation=45)
|
|
|
+plt.show()
|
|
|
+
|
|
|
+# 检查是否有可用的 GPU
|
|
|
+device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
|
|
|
+print(f'Using device: {device}')
|
|
|
+
|
|
|
+# 读取数据
|
|
|
+data = pd.read_excel('./daily_flu_counts.xlsx')
|
|
|
+data = data[['日期', '发病数']].sort_values('日期').fillna(0)
|
|
|
+
|
|
|
+# 标准化数据
|
|
|
+scaler = StandardScaler()
|
|
|
+scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))
|
|
|
+
|
|
|
+# 创建时间序列数据集
|
|
|
+def create_dataset(dataset, look_back=1):
|
|
|
+ X, Y = [], []
|
|
|
+ for i in range(len(dataset) - look_back):
|
|
|
+ X.append(dataset[i:(i + look_back), 0])
|
|
|
+ Y.append(dataset[i + look_back, 0])
|
|
|
+ return np.array(X), np.array(Y)
|
|
|
+
|
|
|
+look_back = 30 # 滑动窗口长度
|
|
|
+X, Y = create_dataset(scaled_data, look_back)
|
|
|
+
|
|
|
+# 拆分训练集和测试集
|
|
|
+train_size = int(len(X) * 0.8)
|
|
|
+X_train, X_test = X[:train_size], X[train_size:]
|
|
|
+Y_train, Y_test = Y[:train_size], Y[train_size:]
|
|
|
+
|
|
|
+# 转换为 PyTorch 张量
|
|
|
+X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
|
|
|
+X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
|
|
|
+Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
|
|
|
+Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)
|
|
|
+
|
|
|
+# 定义位置编码
|
|
|
+class PositionalEncoding(nn.Module):
|
|
|
+ def __init__(self, d_model, max_len=5000):
|
|
|
+ super(PositionalEncoding, self).__init__()
|
|
|
+
|
|
|
+ pe = torch.zeros(max_len, d_model)
|
|
|
+ position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
|
|
+ div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
|
|
|
+
|
|
|
+ pe[:, 0::2] = torch.sin(position * div_term)
|
|
|
+ pe[:, 1::2] = torch.cos(position * div_term)
|
|
|
+
|
|
|
+ pe = pe.unsqueeze(0) # [1, max_len, d_model]
|
|
|
+ self.register_buffer('pe', pe)
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ """
|
|
|
+ x: Tensor, shape [batch_size, seq_len, d_model]
|
|
|
+ """
|
|
|
+ x = x + self.pe[:, :x.size(1), :]
|
|
|
+ return x
|
|
|
+
|
|
|
+# 定义 Transformer 模型
|
|
|
+class EnhancedTransformerModel(nn.Module):
|
|
|
+ def __init__(self, input_size=1, d_model=64, nhead=8, num_encoder_layers=3, dim_feedforward=256, output_size=1, dropout=0.1):
|
|
|
+ super(EnhancedTransformerModel, self).__init__()
|
|
|
+ self.input_linear = nn.Linear(input_size, d_model)
|
|
|
+ self.positional_encoding = PositionalEncoding(d_model)
|
|
|
+
|
|
|
+ encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
|
|
|
+ dim_feedforward=dim_feedforward, dropout=dropout)
|
|
|
+ self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
|
|
|
+
|
|
|
+ self.fc1 = nn.Linear(d_model, 64)
|
|
|
+ self.relu = nn.ReLU()
|
|
|
+ self.fc2 = nn.Linear(64, output_size)
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ """
|
|
|
+ x: Tensor, shape [batch_size, seq_len, input_size]
|
|
|
+ """
|
|
|
+ x = self.input_linear(x) # [batch_size, seq_len, d_model]
|
|
|
+ x = self.positional_encoding(x) # 添加位置编码
|
|
|
+ x = x.transpose(0, 1) # [seq_len, batch_size, d_model]
|
|
|
+ x = self.transformer_encoder(x) # [seq_len, batch_size, d_model]
|
|
|
+ x = x.mean(dim=0) # [batch_size, d_model]
|
|
|
+ x = self.fc1(x)
|
|
|
+ x = self.relu(x)
|
|
|
+ x = self.fc2(x)
|
|
|
+ return x
|
|
|
+
|
|
|
+# 超参数搜索的网格(移除 batch_size)
|
|
|
+param_grid = {
|
|
|
+ 'd_model': [64, 128],
|
|
|
+ 'nhead': [4, 8],
|
|
|
+ 'num_encoder_layers': [2, 3],
|
|
|
+ 'dim_feedforward': [256, 512],
|
|
|
+ 'learning_rate': [0.0001, 0.0005, 0.001]
|
|
|
+}
|
|
|
+
|
|
|
+# 评估函数(移除 batch_size)
|
|
|
+def evaluate_model(d_model, nhead, num_encoder_layers, dim_feedforward, learning_rate):
|
|
|
+ # 初始化模型
|
|
|
+ model = EnhancedTransformerModel(
|
|
|
+ input_size=1,
|
|
|
+ d_model=d_model,
|
|
|
+ nhead=nhead,
|
|
|
+ num_encoder_layers=num_encoder_layers,
|
|
|
+ dim_feedforward=dim_feedforward
|
|
|
+ ).to(device)
|
|
|
+ loss_function = nn.MSELoss()
|
|
|
+ optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
|
|
|
+
|
|
|
+ # 训练模型
|
|
|
+ epochs = 100
|
|
|
+ model.train()
|
|
|
+ for epoch in range(epochs):
|
|
|
+ optimizer.zero_grad()
|
|
|
+ y_pred = model(X_train)
|
|
|
+ loss = loss_function(y_pred, Y_train)
|
|
|
+ loss.backward()
|
|
|
+ optimizer.step()
|
|
|
+
|
|
|
+ # 预测和计算损失
|
|
|
+ model.eval()
|
|
|
+ with torch.no_grad():
|
|
|
+ test_predictions = model(X_test).cpu().numpy()
|
|
|
+ test_predictions_scaled = scaler.inverse_transform(test_predictions)
|
|
|
+ Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
|
|
|
+ mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
|
|
|
+
|
|
|
+ return mse
|
|
|
+
|
|
|
+# 网格搜索
|
|
|
+best_params = None
|
|
|
+best_mse = float('inf')
|
|
|
+
|
|
|
+for params in ParameterGrid(param_grid):
|
|
|
+ print(f'Evaluating with parameters: {params}')
|
|
|
+ mse = evaluate_model(**params)
|
|
|
+ print(f'MSE: {mse}')
|
|
|
+
|
|
|
+ if mse < best_mse:
|
|
|
+ best_mse = mse
|
|
|
+ best_params = params
|
|
|
+
|
|
|
+print(f'Best parameters: {best_params}')
|
|
|
+print(f'Best MSE: {best_mse}')
|
|
|
+
|
|
|
+# 使用最佳参数实例化模型
|
|
|
+model = EnhancedTransformerModel(
|
|
|
+ input_size=1,
|
|
|
+ d_model=best_params['d_model'],
|
|
|
+ nhead=best_params['nhead'],
|
|
|
+ num_encoder_layers=best_params['num_encoder_layers'],
|
|
|
+ dim_feedforward=best_params['dim_feedforward']
|
|
|
+).to(device)
|
|
|
+
|
|
|
+# 定义损失函数和优化器
|
|
|
+loss_function = nn.MSELoss()
|
|
|
+optimizer = torch.optim.Adam(model.parameters(), lr=best_params['learning_rate'])
|
|
|
+
|
|
|
+# 训练模型
|
|
|
+epochs = 500
|
|
|
+train_losses = []
|
|
|
+for epoch in range(epochs):
|
|
|
+ model.train()
|
|
|
+ optimizer.zero_grad()
|
|
|
+ y_pred = model(X_train)
|
|
|
+ loss = loss_function(y_pred, Y_train)
|
|
|
+ loss.backward()
|
|
|
+ optimizer.step()
|
|
|
+
|
|
|
+ train_losses.append(loss.item())
|
|
|
+ if epoch % 50 == 0:
|
|
|
+ print(f'Epoch {epoch}, Loss: {loss.item()}')
|
|
|
+
|
|
|
+# 绘制训练损失
|
|
|
+plt.plot(train_losses, label="训练损失")
|
|
|
+plt.xlabel("Epochs")
|
|
|
+plt.ylabel("Loss")
|
|
|
+plt.legend()
|
|
|
+plt.show()
|
|
|
+
|
|
|
+# 测试集预测
|
|
|
+model.eval()
|
|
|
+with torch.no_grad():
|
|
|
+ test_predictions = model(X_test).cpu().numpy()
|
|
|
+
|
|
|
+# 逆归一化
|
|
|
+test_predictions_scaled = scaler.inverse_transform(test_predictions)
|
|
|
+Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
|
|
|
+
|
|
|
+# 确保预测值不小于0
|
|
|
+test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)
|
|
|
+
|
|
|
+# 计算残差并中心化
|
|
|
+residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
|
|
|
+residuals_centered = residuals - residuals.mean()
|
|
|
+
|
|
|
+# Bootstrap参数
|
|
|
+n_bootstrap = 1000 # 重采样次数
|
|
|
+bootstrap_predictions = []
|
|
|
+
|
|
|
+np.random.seed(42) # 为了结果可复现
|
|
|
+
|
|
|
+for _ in range(n_bootstrap):
|
|
|
+ # 从中心化的残差中有放回地采样
|
|
|
+ sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
|
|
|
+ # 生成新的预测样本
|
|
|
+ bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
|
|
|
+ # 确保预测值不小于0
|
|
|
+ bootstrap_pred = np.clip(bootstrap_pred, 0, None)
|
|
|
+ bootstrap_predictions.append(bootstrap_pred)
|
|
|
+
|
|
|
+bootstrap_predictions = np.array(bootstrap_predictions)
|
|
|
+
|
|
|
+# 计算置信区间(例如 95%)
|
|
|
+lower_percentile = 2.5
|
|
|
+upper_percentile = 97.5
|
|
|
+ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
|
|
|
+ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)
|
|
|
+
|
|
|
+
|
|
|
+# 预测未来的步骤数(例如 30 天)
|
|
|
+future_steps = 30
|
|
|
+future_predictions = []
|
|
|
+last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
|
|
|
+last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)
|
|
|
+
|
|
|
+model.eval()
|
|
|
+with torch.no_grad():
|
|
|
+ for _ in range(future_steps):
|
|
|
+ pred_scaled = model(last_sequence).cpu().numpy()
|
|
|
+ future_predictions.append(pred_scaled.flatten()[0])
|
|
|
+ # 更新序列:移除第一个元素,添加新的预测
|
|
|
+ new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
|
|
|
+ last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)
|
|
|
+
|
|
|
+# 逆归一化
|
|
|
+future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
|
|
|
+future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)
|
|
|
+
|
|
|
+# Bootstrap 未来预测置信区间
|
|
|
+bootstrap_future_predictions = []
|
|
|
+
|
|
|
+for _ in range(n_bootstrap):
|
|
|
+ # 假设未来预测的残差分布与测试集相似
|
|
|
+ sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
|
|
|
+ bootstrap_future_pred = future_predictions_scaled + sampled_residuals
|
|
|
+ # 确保预测值不小于0
|
|
|
+ bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
|
|
|
+ bootstrap_future_predictions.append(bootstrap_future_pred)
|
|
|
+
|
|
|
+bootstrap_future_predictions = np.array(bootstrap_future_predictions)
|
|
|
+
|
|
|
+# 计算置信区间(例如 95%)
|
|
|
+ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
|
|
|
+ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)
|
|
|
+
|
|
|
+# 生成未来的日期
|
|
|
+last_date = data['日期'].iloc[-1]
|
|
|
+
|
|
|
+# 确保 last_date 是 pandas Timestamp 对象
|
|
|
+if not isinstance(last_date, pd.Timestamp):
|
|
|
+ last_date = pd.to_datetime(last_date)
|
|
|
+
|
|
|
+# 使用 start 和 periods 生成未来的日期,不使用 closed 参数
|
|
|
+future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)
|