123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import pandas as pd
- import numpy as np
- import pyreadstat
- from sklearn.experimental import enable_iterative_imputer
- from sklearn.impute import IterativeImputer
- if __name__ == "__main__":
- CHARLS_data = pd.read_csv("CHARLS_data_pollutants_p_n_m_nd_h.csv")
- cavariates = ["last_year_NO2", "before_last_NO2", "last_year_O3", "before_last_O3",
- "last_year_pm1", "before_last_pm1", "last_year_pm2.5", "before_last_pm2.5", "last_year_pm10",
- 'before_last_pm10', 'last_year_SO4', 'last_year_NO3', 'last_year_NH4', 'last_year_OM', 'last_year_BC', 'before_last_SO4',
- 'before_last_NO3', 'before_last_NH4', 'before_last_OM', 'before_last_BC', 'last_year_nl', 'before_last_nl']
- #挑出需要的字段
- data = CHARLS_data[["ID", "rgender", "age", "marital_status", "education", "Physical_activity", "Psychiatric_score", "BMI", "ADL", "Smoke", "Drink",
- 'Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases',
- 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease',
- 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma', 'wave'
- ]+cavariates]
- #处理共病状态
- data["state"] = data['Hypertension'] + data['Dyslipidemia']+ data['Disabetes_or_High_Blood_Sugar'] + data['Cancer_or_Malignant_Tumor'] + data['Chronic_Lung_Diseases'] + data['Liver_Disease'] \
- + data['Heart_Problems'] + data['Stroke'] + data['Kidney_Diease'] + data['Stomach_or_Other_Digestive_Disease'] + data['Emotional_Nervous_or_Psychiatric_Problems'] + data['Memory_Related_Disease'] \
- + data['Arthritis_or_Rheumatism'] + data['Asthma']
- data["state"] = data['state'].apply(lambda x : 1 if x == 0 else 2 if x == 1 else 3 if x >= 2 else np.nan)
- data = data.drop(columns=['Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases',
- 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease',
- 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma'])
-
- #增加一列死亡状态
- #0:未死亡
- #1:死亡
- #读取2013年的死亡数据
- exit, meta = pyreadstat.read_dta("/root/r_base/CHARLS/CHARLS2013/Exit_Interview.dta")
- exit['ID'] = pd.to_numeric(exit['ID'], errors='coerce').astype('Int64')
- exit["exit_year"] = exit["exb001_1"]
- data = pd.merge(data, exit[['ID', "exit_year"]], on = "ID", how="left")
- #读取2020年的死亡数据
- exit, meta = pyreadstat.read_dta("/root/r_base/CHARLS/CHARLS2020/Exit_Module.dta")
- exit['ID'] = pd.to_numeric(exit['ID'], errors='coerce').astype('Int64')
- exit["exit_year"] = exit["exb001_1"]
- data = pd.merge(data, exit[['ID', "exit_year"]], on = "ID", how="left")
-
- #合并两次死亡数据
- data["exit_year"] = data["exit_year_x"].fillna(data["exit_year_y"])
- # 定义随访年的列表
- followup_years = [2011, 2013, 2015, 2018, 2020]
- # 将 'id' 和 'wave' 转换为整型,确保排序不会出问题
- data['ID'] = data['ID'].astype(int)
- data['wave'] = data['wave'].astype(int)
- # 找到每个死亡个体的最后一次随访数据
- last_followup = data.dropna(subset=['exit_year']).groupby('ID').apply(lambda x: x[x['wave'] == x['wave'].max()])
- # 创建一个布尔掩码,用于标记需要将 'state' 修改为 4 的行
- mask = last_followup['wave'] > last_followup['exit_year']
- # 将 MultiIndex 转换为单一的 ID 索引
- last_followup_ids = last_followup[mask].index.get_level_values('ID')
- # 使用布尔索引直接在 data 中修改对应行的 'state'
- data.loc[data['ID'].isin(last_followup_ids) & (data['wave'] == data.groupby('ID')['wave'].transform('max')), 'state'] = 4
- # 创建新的记录并为每个死亡个体设置下一次随访年
- new_rows = last_followup[last_followup['wave'] <= last_followup['exit_year']].copy()
- new_rows['wave'] = new_rows['wave'].apply(lambda x: next((year for year in followup_years if year > x), None))
- new_rows['state'] = 4
- # 将新行添加到原始 DataFrame 中
- data = pd.concat([data, new_rows], ignore_index=True).sort_values(by=['ID', 'wave']).reset_index(drop=True)
- #删除多余列
- data = data.drop(columns=["exit_year_x", "exit_year_y", "exit_year"])
- # 统计唯一用户的个数
- unique_user_count = data['ID'].nunique()
- print(f"删除空状态前的用户数:{unique_user_count}")
- # 将状态为空的数据删除
- # 查找所有有空值 state 的用户 ID
- users_with_na = data[data['state'].isna()]['ID'].unique()
- # 删除这些用户的所有数据
- data = data[~data['ID'].isin(users_with_na)]
- unique_user_count = data['ID'].nunique()
- print(f"删除空状态后的用户数:{unique_user_count}")
- #获取参加全部批次的纵向数据
- # 1. 统计每个用户的批次数
- user_counts = data.groupby('ID')['wave'].count().reset_index()
- user_counts.columns = ['ID', 'wave_count']
- # 2. 找到每个用户的最大批次的状态
- max_wave_state = data.loc[data.groupby('ID')['wave'].idxmax()][['ID', 'state']]
- max_wave_state.columns = ['ID', 'max_wave_state']
- # 3. 将用户的批次数和最大批次状态合并回原始数据
- data = data.merge(user_counts, on='ID').merge(max_wave_state, on='ID')
- # 4. 筛选满足条件的用户
- condition_1 = (data['wave_count'] == 5)
- condition_2 = (data['max_wave_state'] == 4) & (data['wave_count'] > 1)
- data = data[condition_1 | condition_2]
- # 5. 清除多余的列
- data = data.drop(columns=['wave_count', 'max_wave_state']).reset_index(drop=True)
- unique_user_count = data['ID'].nunique()
- print(f"参加全部批次的用户数:{unique_user_count}")
-
- #删除45岁以下的用户ID
- users_with_45_age = data[data['age']<45]['ID'].unique()
- # 删除这些用户的所有数据
- data = data[~data['ID'].isin(users_with_45_age)]
- unique_user_count = data['ID'].nunique()
- print(f"删除45岁后的用户数:{unique_user_count}")
- # 查找所有有空值age 的用户 ID
- users_with_na_age = data[data['age'].isna()]['ID'].unique()
- # 删除这些用户的所有数据
- data = data[~data['ID'].isin(users_with_na_age)]
- unique_user_count = data['ID'].nunique()
- print(f"删除空年龄后的用户数:{unique_user_count}")
- # 查找所有有空值education 的用户 ID
- users_with_na_education = data[data['education'].isna()]['ID'].unique()
- # 删除这些用户的所有数据
- data = data[~data['ID'].isin(users_with_na_education)]
- unique_user_count = data['ID'].nunique()
- print(f"删除空教育后的用户数:{unique_user_count}")
- #删除异常的BMI
- users_with_BMI_wr = data[data["BMI"]>=200]['ID'].unique()
- # 删除这些用户的所有数据
- data = data[~data['ID'].isin(users_with_BMI_wr)]
- unique_user_count = data['ID'].nunique()
- print(f"删除异常BMI的用户数:{unique_user_count}")
- #多重插补
- # 初始化多重插补模型
- imputer = IterativeImputer(max_iter=10, random_state=0)
- # 进行多重插补
- imputed_data = imputer.fit_transform(data)
- # 将插补后的数据转换为 DataFrame
- data = pd.DataFrame(imputed_data, columns=data.columns)
- # 将分类变量列取整
- categorical_columns = ['Physical_activity', 'Psychiatric_score', 'ADL', 'Smoke', 'Drink'] # 分类变量列名
- data[categorical_columns] = data[categorical_columns].round().astype(int)
- # 修正负值,将所有小于 0 的值替换为 0 或指定的最小值
- data[data < 0] = 0
- #排序将ID相同的放到一起
- data.to_csv("paper_data.csv", index=False)
|