123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- import torch
- import torch.nn as nn
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- from sklearn.preprocessing import StandardScaler
- from sklearn.metrics import mean_squared_error
- from sklearn.model_selection import ParameterGrid
- import matplotlib
- import seaborn as sns
- from sklearn.metrics import mean_squared_error, mean_absolute_error
- from scipy.stats import shapiro
- from statsmodels.graphics.tsaplots import plot_acf
- from scipy.stats import probplot
- from statsmodels.tsa.stattools import adfuller
- # 设置中文字体
- matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
- matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
- # 数据预处理
- data = pd.read_excel('./daily_flu_counts.xlsx')
- data = data[['日期', '发病数']].sort_values('日期').fillna(0)
- # 使用线性插值填充缺失值
- data['发病数'] = data['发病数'].interpolate(method='linear')
- # 确保数据非负
- data['发病数'] = data['发病数'].clip(lower=0)
- # 确保数据非负
- data['发病数'] = data['发病数'].clip(lower=0)
- # 确保 '日期' 列为 datetime 类型
- data['日期'] = pd.to_datetime(data['日期'])
- # 绘制原数据的曲线图
- plt.figure(figsize=(12, 6))
- plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')
- plt.title('每日发病数曲线图')
- plt.xlabel('日期')
- plt.ylabel('发病数')
- plt.legend()
- plt.grid(True)
- plt.xticks(rotation=45)
- plt.show()
- # 检查是否有可用的 GPU
- device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
- print(f'Using device: {device}')
- # 读取数据
- data = pd.read_excel('./daily_flu_counts.xlsx')
- data = data[['日期', '发病数']].sort_values('日期').fillna(0)
- # 标准化数据
- scaler = StandardScaler()
- scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))
- # 创建时间序列数据集
- def create_dataset(dataset, look_back=1):
- X, Y = [], []
- for i in range(len(dataset) - look_back):
- X.append(dataset[i:(i + look_back), 0])
- Y.append(dataset[i + look_back, 0])
- return np.array(X), np.array(Y)
- look_back = 30 # 滑动窗口长度
- X, Y = create_dataset(scaled_data, look_back)
- # 拆分训练集和测试集
- train_size = int(len(X) * 0.8)
- X_train, X_test = X[:train_size], X[train_size:]
- Y_train, Y_test = Y[:train_size], Y[train_size:]
- # 转换为 PyTorch 张量
- X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
- X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
- Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
- Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)
- # 定义位置编码
- class PositionalEncoding(nn.Module):
- def __init__(self, d_model, max_len=5000):
- super(PositionalEncoding, self).__init__()
-
- pe = torch.zeros(max_len, d_model)
- position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
- div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
-
- pe[:, 0::2] = torch.sin(position * div_term)
- pe[:, 1::2] = torch.cos(position * div_term)
-
- pe = pe.unsqueeze(0) # [1, max_len, d_model]
- self.register_buffer('pe', pe)
-
- def forward(self, x):
- """
- x: Tensor, shape [batch_size, seq_len, d_model]
- """
- x = x + self.pe[:, :x.size(1), :]
- return x
- # 定义 Transformer 模型
- class EnhancedTransformerModel(nn.Module):
- def __init__(self, input_size=1, d_model=64, nhead=8, num_encoder_layers=3, dim_feedforward=256, output_size=1, dropout=0.1):
- super(EnhancedTransformerModel, self).__init__()
- self.input_linear = nn.Linear(input_size, d_model)
- self.positional_encoding = PositionalEncoding(d_model)
-
- encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
- dim_feedforward=dim_feedforward, dropout=dropout)
- self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
-
- self.fc1 = nn.Linear(d_model, 64)
- self.relu = nn.ReLU()
- self.fc2 = nn.Linear(64, output_size)
-
- def forward(self, x):
- """
- x: Tensor, shape [batch_size, seq_len, input_size]
- """
- x = self.input_linear(x) # [batch_size, seq_len, d_model]
- x = self.positional_encoding(x) # 添加位置编码
- x = x.transpose(0, 1) # [seq_len, batch_size, d_model]
- x = self.transformer_encoder(x) # [seq_len, batch_size, d_model]
- x = x.mean(dim=0) # [batch_size, d_model]
- x = self.fc1(x)
- x = self.relu(x)
- x = self.fc2(x)
- return x
- # 超参数搜索的网格(移除 batch_size)
- param_grid = {
- 'd_model': [64, 128],
- 'nhead': [4, 8],
- 'num_encoder_layers': [2, 3],
- 'dim_feedforward': [256, 512],
- 'learning_rate': [0.0001, 0.0005, 0.001]
- }
- # 评估函数(移除 batch_size)
- def evaluate_model(d_model, nhead, num_encoder_layers, dim_feedforward, learning_rate):
- # 初始化模型
- model = EnhancedTransformerModel(
- input_size=1,
- d_model=d_model,
- nhead=nhead,
- num_encoder_layers=num_encoder_layers,
- dim_feedforward=dim_feedforward
- ).to(device)
- loss_function = nn.MSELoss()
- optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
- # 训练模型
- epochs = 100
- model.train()
- for epoch in range(epochs):
- optimizer.zero_grad()
- y_pred = model(X_train)
- loss = loss_function(y_pred, Y_train)
- loss.backward()
- optimizer.step()
- # 预测和计算损失
- model.eval()
- with torch.no_grad():
- test_predictions = model(X_test).cpu().numpy()
- test_predictions_scaled = scaler.inverse_transform(test_predictions)
- Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
- mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
-
- return mse
- # 网格搜索
- best_params = None
- best_mse = float('inf')
- for params in ParameterGrid(param_grid):
- print(f'Evaluating with parameters: {params}')
- mse = evaluate_model(**params)
- print(f'MSE: {mse}')
-
- if mse < best_mse:
- best_mse = mse
- best_params = params
- print(f'Best parameters: {best_params}')
- print(f'Best MSE: {best_mse}')
- # 使用最佳参数实例化模型
- model = EnhancedTransformerModel(
- input_size=1,
- d_model=best_params['d_model'],
- nhead=best_params['nhead'],
- num_encoder_layers=best_params['num_encoder_layers'],
- dim_feedforward=best_params['dim_feedforward']
- ).to(device)
- # 定义损失函数和优化器
- loss_function = nn.MSELoss()
- optimizer = torch.optim.Adam(model.parameters(), lr=best_params['learning_rate'])
- # 训练模型
- epochs = 500
- train_losses = []
- for epoch in range(epochs):
- model.train()
- optimizer.zero_grad()
- y_pred = model(X_train)
- loss = loss_function(y_pred, Y_train)
- loss.backward()
- optimizer.step()
- train_losses.append(loss.item())
- if epoch % 50 == 0:
- print(f'Epoch {epoch}, Loss: {loss.item()}')
- # 绘制训练损失
- plt.plot(train_losses, label="训练损失")
- plt.xlabel("Epochs")
- plt.ylabel("Loss")
- plt.legend()
- plt.show()
- # 测试集预测
- model.eval()
- with torch.no_grad():
- test_predictions = model(X_test).cpu().numpy()
- # 逆归一化
- test_predictions_scaled = scaler.inverse_transform(test_predictions)
- Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
- # 确保预测值不小于0
- test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)
- # 计算残差并中心化
- residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
- residuals_centered = residuals - residuals.mean()
- # Bootstrap参数
- n_bootstrap = 1000 # 重采样次数
- bootstrap_predictions = []
- np.random.seed(42) # 为了结果可复现
- for _ in range(n_bootstrap):
- # 从中心化的残差中有放回地采样
- sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
- # 生成新的预测样本
- bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
- # 确保预测值不小于0
- bootstrap_pred = np.clip(bootstrap_pred, 0, None)
- bootstrap_predictions.append(bootstrap_pred)
- bootstrap_predictions = np.array(bootstrap_predictions)
- # 计算置信区间(例如 95%)
- lower_percentile = 2.5
- upper_percentile = 97.5
- ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
- ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)
- # 预测未来的步骤数(例如 30 天)
- future_steps = 30
- future_predictions = []
- last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
- last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)
- model.eval()
- with torch.no_grad():
- for _ in range(future_steps):
- pred_scaled = model(last_sequence).cpu().numpy()
- future_predictions.append(pred_scaled.flatten()[0])
- # 更新序列:移除第一个元素,添加新的预测
- new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
- last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)
- # 逆归一化
- future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
- future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)
- # Bootstrap 未来预测置信区间
- bootstrap_future_predictions = []
- for _ in range(n_bootstrap):
- # 假设未来预测的残差分布与测试集相似
- sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
- bootstrap_future_pred = future_predictions_scaled + sampled_residuals
- # 确保预测值不小于0
- bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
- bootstrap_future_predictions.append(bootstrap_future_pred)
- bootstrap_future_predictions = np.array(bootstrap_future_predictions)
- # 计算置信区间(例如 95%)
- ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
- ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)
- # 生成未来的日期
- last_date = data['日期'].iloc[-1]
- # 确保 last_date 是 pandas Timestamp 对象
- if not isinstance(last_date, pd.Timestamp):
- last_date = pd.to_datetime(last_date)
- # 使用 start 和 periods 生成未来的日期,不使用 closed 参数
- future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)
|