import pandas as pd import numpy as np import pyreadstat from sklearn.experimental import enable_iterative_imputer from sklearn.impute import IterativeImputer if __name__ == "__main__": CHARLS_data = pd.read_csv("CHARLS_data_pollutants_p_n_m_nd_h.csv") cavariates = ["last_year_NO2", "before_last_NO2", "last_year_O3", "before_last_O3", "last_year_pm1", "before_last_pm1", "last_year_pm2.5", "before_last_pm2.5", "last_year_pm10", 'before_last_pm10', 'last_year_SO4', 'last_year_NO3', 'last_year_NH4', 'last_year_OM', 'last_year_BC', 'before_last_SO4', 'before_last_NO3', 'before_last_NH4', 'before_last_OM', 'before_last_BC', 'last_year_nl', 'before_last_nl'] #挑出需要的字段 data = CHARLS_data[["ID", "rgender", "age", "marital_status", "education", "Physical_activity", "Psychiatric_score", "BMI", "ADL", "Smoke", "Drink", 'Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases', 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease', 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma', 'wave' ]+cavariates] #处理共病状态 data["state"] = data['Hypertension'] + data['Dyslipidemia']+ data['Disabetes_or_High_Blood_Sugar'] + data['Cancer_or_Malignant_Tumor'] + data['Chronic_Lung_Diseases'] + data['Liver_Disease'] \ + data['Heart_Problems'] + data['Stroke'] + data['Kidney_Diease'] + data['Stomach_or_Other_Digestive_Disease'] + data['Emotional_Nervous_or_Psychiatric_Problems'] + data['Memory_Related_Disease'] \ + data['Arthritis_or_Rheumatism'] + data['Asthma'] data["state"] = data['state'].apply(lambda x : 1 if x == 0 else 2 if x == 1 else 3 if x >= 2 else np.nan) data = data.drop(columns=['Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases', 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease', 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma']) #增加一列死亡状态 #0:未死亡 #1:死亡 #读取2013年的死亡数据 exit, meta = pyreadstat.read_dta("/root/r_base/CHARLS/CHARLS2013/Exit_Interview.dta") exit['ID'] = pd.to_numeric(exit['ID'], errors='coerce').astype('Int64') exit["exit_year"] = exit["exb001_1"] data = pd.merge(data, exit[['ID', "exit_year"]], on = "ID", how="left") #读取2020年的死亡数据 exit, meta = pyreadstat.read_dta("/root/r_base/CHARLS/CHARLS2020/Exit_Module.dta") exit['ID'] = pd.to_numeric(exit['ID'], errors='coerce').astype('Int64') exit["exit_year"] = exit["exb001_1"] data = pd.merge(data, exit[['ID', "exit_year"]], on = "ID", how="left") #合并两次死亡数据 data["exit_year"] = data["exit_year_x"].fillna(data["exit_year_y"]) # 定义随访年的列表 followup_years = [2011, 2013, 2015, 2018, 2020] # 将 'id' 和 'wave' 转换为整型,确保排序不会出问题 data['ID'] = data['ID'].astype(int) data['wave'] = data['wave'].astype(int) # 找到每个死亡个体的最后一次随访数据 last_followup = data.dropna(subset=['exit_year']).groupby('ID').apply(lambda x: x[x['wave'] == x['wave'].max()]) # 创建一个布尔掩码,用于标记需要将 'state' 修改为 4 的行 mask = last_followup['wave'] > last_followup['exit_year'] # 将 MultiIndex 转换为单一的 ID 索引 last_followup_ids = last_followup[mask].index.get_level_values('ID') # 使用布尔索引直接在 data 中修改对应行的 'state' data.loc[data['ID'].isin(last_followup_ids) & (data['wave'] == data.groupby('ID')['wave'].transform('max')), 'state'] = 4 # 创建新的记录并为每个死亡个体设置下一次随访年 new_rows = last_followup[last_followup['wave'] <= last_followup['exit_year']].copy() new_rows['wave'] = new_rows['wave'].apply(lambda x: next((year for year in followup_years if year > x), None)) new_rows['state'] = 4 # 将新行添加到原始 DataFrame 中 data = pd.concat([data, new_rows], ignore_index=True).sort_values(by=['ID', 'wave']).reset_index(drop=True) #删除多余列 data = data.drop(columns=["exit_year_x", "exit_year_y", "exit_year"]) # 统计唯一用户的个数 unique_user_count = data['ID'].nunique() print(f"删除空状态前的用户数:{unique_user_count}") # 将状态为空的数据删除 # 查找所有有空值 state 的用户 ID users_with_na = data[data['state'].isna()]['ID'].unique() # 删除这些用户的所有数据 data = data[~data['ID'].isin(users_with_na)] unique_user_count = data['ID'].nunique() print(f"删除空状态后的用户数:{unique_user_count}") #获取参加全部批次的纵向数据 # 1. 统计每个用户的批次数 user_counts = data.groupby('ID')['wave'].count().reset_index() user_counts.columns = ['ID', 'wave_count'] # 2. 找到每个用户的最大批次的状态 max_wave_state = data.loc[data.groupby('ID')['wave'].idxmax()][['ID', 'state']] max_wave_state.columns = ['ID', 'max_wave_state'] # 3. 将用户的批次数和最大批次状态合并回原始数据 data = data.merge(user_counts, on='ID').merge(max_wave_state, on='ID') # 4. 筛选满足条件的用户 condition_1 = (data['wave_count'] == 5) condition_2 = (data['max_wave_state'] == 4) & (data['wave_count'] > 1) data = data[condition_1 | condition_2] # 5. 清除多余的列 data = data.drop(columns=['wave_count', 'max_wave_state']).reset_index(drop=True) unique_user_count = data['ID'].nunique() print(f"参加全部批次的用户数:{unique_user_count}") #删除45岁以下的用户ID users_with_45_age = data[data['age']<45]['ID'].unique() # 删除这些用户的所有数据 data = data[~data['ID'].isin(users_with_45_age)] unique_user_count = data['ID'].nunique() print(f"删除45岁后的用户数:{unique_user_count}") # 查找所有有空值age 的用户 ID users_with_na_age = data[data['age'].isna()]['ID'].unique() # 删除这些用户的所有数据 data = data[~data['ID'].isin(users_with_na_age)] unique_user_count = data['ID'].nunique() print(f"删除空年龄后的用户数:{unique_user_count}") # 查找所有有空值education 的用户 ID users_with_na_education = data[data['education'].isna()]['ID'].unique() # 删除这些用户的所有数据 data = data[~data['ID'].isin(users_with_na_education)] unique_user_count = data['ID'].nunique() print(f"删除空教育后的用户数:{unique_user_count}") #删除异常的BMI users_with_BMI_wr = data[data["BMI"]>=200]['ID'].unique() # 删除这些用户的所有数据 data = data[~data['ID'].isin(users_with_BMI_wr)] unique_user_count = data['ID'].nunique() print(f"删除异常BMI的用户数:{unique_user_count}") #多重插补 # 初始化多重插补模型 imputer = IterativeImputer(max_iter=10, random_state=0) # 进行多重插补 imputed_data = imputer.fit_transform(data) # 将插补后的数据转换为 DataFrame data = pd.DataFrame(imputed_data, columns=data.columns) # 将分类变量列取整 categorical_columns = ['Physical_activity', 'Psychiatric_score', 'ADL', 'Smoke', 'Drink'] # 分类变量列名 data[categorical_columns] = data[categorical_columns].round().astype(int) # 修正负值,将所有小于 0 的值替换为 0 或指定的最小值 data[data < 0] = 0 #排序将ID相同的放到一起 data.to_csv("paper_data.csv", index=False)