data_preprocess.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import pandas as pd
  2. import numpy as np
  3. import pyreadstat
  4. from sklearn.experimental import enable_iterative_imputer
  5. from sklearn.impute import IterativeImputer
  6. if __name__ == "__main__":
  7. CHARLS_data = pd.read_csv("CHARLS_data_p_n_m_nd_h.csv")
  8. cavariates = ["last_year_NO2", "before_last_NO2", "last_year_O3", "before_last_O3",
  9. "last_year_pm1", "before_last_pm1", "last_year_pm2.5", "before_last_pm2.5", "last_year_pm10",
  10. 'before_last_pm10', 'last_year_SO4', 'last_year_NO3', 'last_year_NH4', 'last_year_OM', 'last_year_BC', 'before_last_SO4',
  11. 'before_last_NO3', 'before_last_NH4', 'before_last_OM', 'before_last_BC', 'last_year_nl', 'before_last_nl']
  12. #挑出需要的字段
  13. data = CHARLS_data[["ID", "rgender", "age", "marital_status", "education", "Physical_activity", "Psychiatric_score", "BMI", "ADL", "Smoke", "Drink",
  14. 'Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases',
  15. 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease',
  16. 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma', 'wave'
  17. ]+cavariates]
  18. #处理共病状态
  19. data["state"] = data['Hypertension'] + data['Dyslipidemia']+ data['Disabetes_or_High_Blood_Sugar'] + data['Cancer_or_Malignant_Tumor'] + data['Chronic_Lung_Diseases'] + data['Liver_Disease'] \
  20. + data['Heart_Problems'] + data['Stroke'] + data['Kidney_Diease'] + data['Stomach_or_Other_Digestive_Disease'] + data['Emotional_Nervous_or_Psychiatric_Problems'] + data['Memory_Related_Disease'] \
  21. + data['Arthritis_or_Rheumatism'] + data['Asthma']
  22. data["state"] = data['state'].apply(lambda x : 1 if x == 0 else 2 if x == 1 else 3 if x >= 2 else np.nan)
  23. data = data.drop(columns=['Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases',
  24. 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease',
  25. 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma'])
  26. #增加一列死亡状态
  27. #0:未死亡
  28. #1:死亡
  29. #读取2013年的死亡数据
  30. exit, meta = pyreadstat.read_dta("/root/r_base/CHARLS/CHARLS2013/Exit_Interview.dta")
  31. exit['ID'] = pd.to_numeric(exit['ID'], errors='coerce').astype('Int64')
  32. exit["exit_year"] = exit["exb001_1"]
  33. data = pd.merge(data, exit[['ID', "exit_year"]], on = "ID", how="left")
  34. #读取2020年的死亡数据
  35. exit, meta = pyreadstat.read_dta("/root/r_base/CHARLS/CHARLS2020/Exit_Module.dta")
  36. exit['ID'] = pd.to_numeric(exit['ID'], errors='coerce').astype('Int64')
  37. exit["exit_year"] = exit["exb001_1"]
  38. data = pd.merge(data, exit[['ID', "exit_year"]], on = "ID", how="left")
  39. #合并两次死亡数据
  40. data["exit_year"] = data["exit_year_x"].fillna(data["exit_year_y"])
  41. # 定义随访年的列表
  42. followup_years = [2011, 2013, 2015, 2018, 2020]
  43. # 将 'id' 和 'wave' 转换为整型,确保排序不会出问题
  44. data['ID'] = data['ID'].astype(int)
  45. data['wave'] = data['wave'].astype(int)
  46. # 找到每个死亡个体的最后一次随访数据
  47. last_followup = data.dropna(subset=['exit_year']).groupby('ID').apply(lambda x: x[x['wave'] == x['wave'].max()])
  48. # 创建一个布尔掩码,用于标记需要将 'state' 修改为 4 的行
  49. mask = last_followup['wave'] > last_followup['exit_year']
  50. # 将 MultiIndex 转换为单一的 ID 索引
  51. last_followup_ids = last_followup[mask].index.get_level_values('ID')
  52. # 使用布尔索引直接在 data 中修改对应行的 'state'
  53. data.loc[data['ID'].isin(last_followup_ids) & (data['wave'] == data.groupby('ID')['wave'].transform('max')), 'state'] = 4
  54. # 创建新的记录并为每个死亡个体设置下一次随访年
  55. new_rows = last_followup[last_followup['wave'] <= last_followup['exit_year']].copy()
  56. new_rows['wave'] = new_rows['wave'].apply(lambda x: next((year for year in followup_years if year > x), None))
  57. new_rows['state'] = 4
  58. # 将新行添加到原始 DataFrame 中
  59. data = pd.concat([data, new_rows], ignore_index=True).sort_values(by=['ID', 'wave']).reset_index(drop=True)
  60. #删除多余列
  61. data = data.drop(columns=["exit_year_x", "exit_year_y", "exit_year"])
  62. # 统计唯一用户的个数
  63. unique_user_count = data['ID'].nunique()
  64. print(f"删除空状态前的用户数:{unique_user_count}")
  65. # 将状态为空的数据删除
  66. # 查找所有有空值 state 的用户 ID
  67. users_with_na = data[data['state'].isna()]['ID'].unique()
  68. # 删除这些用户的所有数据
  69. data = data[~data['ID'].isin(users_with_na)]
  70. unique_user_count = data['ID'].nunique()
  71. print(f"删除空状态后的用户数:{unique_user_count}")
  72. #获取参加全部批次的纵向数据
  73. # 1. 统计每个用户的批次数
  74. user_counts = data.groupby('ID')['wave'].count().reset_index()
  75. user_counts.columns = ['ID', 'wave_count']
  76. # 2. 找到每个用户的最大批次的状态
  77. max_wave_state = data.loc[data.groupby('ID')['wave'].idxmax()][['ID', 'state']]
  78. max_wave_state.columns = ['ID', 'max_wave_state']
  79. # 3. 将用户的批次数和最大批次状态合并回原始数据
  80. data = data.merge(user_counts, on='ID').merge(max_wave_state, on='ID')
  81. # 4. 筛选满足条件的用户
  82. condition_1 = (data['wave_count'] == 5)
  83. condition_2 = (data['max_wave_state'] == 4) & (data['wave_count'] > 1)
  84. data = data[condition_1 | condition_2]
  85. # 5. 清除多余的列
  86. data = data.drop(columns=['wave_count', 'max_wave_state']).reset_index(drop=True)
  87. unique_user_count = data['ID'].nunique()
  88. print(f"参加全部批次的用户数:{unique_user_count}")
  89. #删除45岁以下的用户ID
  90. users_with_45_age = data[data['age']<45]['ID'].unique()
  91. # 删除这些用户的所有数据
  92. data = data[~data['ID'].isin(users_with_45_age)]
  93. unique_user_count = data['ID'].nunique()
  94. print(f"删除45岁后的用户数:{unique_user_count}")
  95. # 查找所有有空值age 的用户 ID
  96. users_with_na_age = data[data['age'].isna()]['ID'].unique()
  97. # 删除这些用户的所有数据
  98. data = data[~data['ID'].isin(users_with_na_age)]
  99. unique_user_count = data['ID'].nunique()
  100. print(f"删除空年龄后的用户数:{unique_user_count}")
  101. # 查找所有有空值education 的用户 ID
  102. users_with_na_education = data[data['education'].isna()]['ID'].unique()
  103. # 删除这些用户的所有数据
  104. data = data[~data['ID'].isin(users_with_na_education)]
  105. unique_user_count = data['ID'].nunique()
  106. print(f"删除空教育后的用户数:{unique_user_count}")
  107. #删除异常的BMI
  108. users_with_BMI_wr = data[data["BMI"]>=200]['ID'].unique()
  109. # 删除这些用户的所有数据
  110. data = data[~data['ID'].isin(users_with_BMI_wr)]
  111. unique_user_count = data['ID'].nunique()
  112. print(f"删除异常BMI的用户数:{unique_user_count}")
  113. #多重插补
  114. # 初始化多重插补模型
  115. imputer = IterativeImputer(max_iter=10, random_state=0)
  116. # 进行多重插补
  117. imputed_data = imputer.fit_transform(data)
  118. # 将插补后的数据转换为 DataFrame
  119. data = pd.DataFrame(imputed_data, columns=data.columns)
  120. # 将分类变量列取整
  121. categorical_columns = ['Physical_activity', 'Psychiatric_score', 'ADL', 'Smoke', 'Drink'] # 分类变量列名
  122. data[categorical_columns] = data[categorical_columns].round().astype(int)
  123. # 修正负值,将所有小于 0 的值替换为 0 或指定的最小值
  124. data[data < 0] = 0
  125. #排序将ID相同的放到一起
  126. data.to_csv("paper_data.csv", index=False)