import pandas as pd import numpy as np import pyreadstat from datetime import date from lunarcalendar import Converter, Lunar # 定义一个函数,用于更新 harmonized 中的 mstat 列 def update_mstat(harmonized, col_name): harmonized[col_name] = harmonized[col_name].apply( lambda x: 1 if x in [1, 3] else 0 if x in [4, 5, 7, 8] else np.nan ) def update_physical(harmonized): harmonized["r1phys"] = harmonized.apply(lambda x : 2 if x["r1vgact_c"]==1 else 1 if x["r1mdact_c"]==1 else 0 if x["r1ltact_c"] == 1 or (x["r1vgact_c"]==0 and x["r1mdact_c"]==0 and x["r1ltact_c"] == 0) else np.nan ,axis=1) harmonized["r2phys"] = harmonized.apply(lambda x : 2 if x["r2vgact_c"]==1 else 1 if x["r2mdact_c"]==1 else 0 if x["r2ltact_c"] == 1 or (x["r2vgact_c"]==0 and x["r2mdact_c"]==0 and x["r2ltact_c"] == 0) else np.nan ,axis=1) harmonized["r3phys"] = harmonized.apply(lambda x : 2 if x["r3vgact_c"]==1 else 1 if x["r3mdact_c"]==1 else 0 if x["r3ltact_c"] == 1 or (x["r3vgact_c"]==0 and x["r3mdact_c"]==0 and x["r3ltact_c"] == 0) else np.nan ,axis=1) harmonized["r4phys"] = harmonized.apply(lambda x : 2 if x["r4vgact_c"]==1 else 1 if x["r4mdact_c"]==1 else 0 if x["r4ltact_c"] == 1 or (x["r4vgact_c"]==0 and x["r4mdact_c"]==0 and x["r4ltact_c"] == 0) else np.nan ,axis=1) def merge_data(harmonized, waves, flag="other"): merged_data = [] # 遍历年份和列名,处理合并数据 for wave, col_name in waves: if flag=="mstat": update_mstat(harmonized, col_name) elif flag == "phys": update_physical(harmonized) # 获取对应年份的数据,并将结果存入列表 merged_data.append(pd.merge( CHARLS_data[CHARLS_data["wave"] == wave], harmonized[["ID", col_name]], on="ID", how="left" )[col_name]) return merged_data # 通过 groupby 采用少数服从多数原则填充性别 def fill_gender(group, col): # 计算性别众数 mode_gender = group[col].mode() if not mode_gender.empty: # 用众数替换组内所有性别值 group[col] = mode_gender[0] return group def calculate_age(row): # 检查空值 if pd.isnull(row['birth_year']) or pd.isnull(row['birth_month']) or pd.isnull(row['iyear']) or pd.isnull(row['imonth']): return np.nan # 返回 NaN 代表无法计算年龄 # 获取出生年月 birth_year = int(row['birth_year']) birth_month = int(row['birth_month']) if birth_month == 0: birth_month = 6 # 确定出生日期 if row['ba003'] == 1: # 公历 birth_date = date(birth_year, birth_month, 1) else: lunar = Lunar(birth_year, birth_month, 1, isleap=False) # 农历 birth_date = Converter.Lunar2Solar(lunar) # 获取随访年月 followup_year = int(row['iyear']) followup_month = int(row['imonth']) followup_date = date(followup_year, followup_month, 1) # 计算年龄 age = followup_date.year - birth_date.year - ((followup_date.month, followup_date.day) < (birth_date.month, birth_date.day)) return age if __name__ == "__main__": harmonized, meta = pyreadstat.read_dta("/root/r_base/CHARLS/Harmonized_CHARLS/H_CHARLS_D_Data.dta") CHARLS_data = pd.read_csv("CHARLS_data_pollutants_p_n_m_nd.csv") harmonized['ID'] = harmonized['ID'].astype(str) # 转换为字符串 CHARLS_data['ID'] = CHARLS_data['ID'].astype(str) # 转换为字符串 #婚姻状况 # 1 married or partnered # 0 other marital status (separated, divorced, unmarried, or widowed) # 定义年份和对应的列名 waves = [(2011, "r1mstat"), (2013, "r2mstat"), (2015, "r3mstat"), (2018, "r4mstat")] # 将四列数据合并为一列,并赋值给 CHARLS_data["mstat"] CHARLS_data["marital_status_m"] = pd.concat(merge_data(harmonized, waves, "mstat"), ignore_index=True) #身高 waves = [(2011, "r1mheight"), (2013, "r2mheight"), (2015, "r3mheight")] CHARLS_data["Height_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #体重 waves = [(2011, "r1mweight"), (2013, "r2mweight"), (2015, "r3mweight")] CHARLS_data["Weight_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #腰围 waves = [(2011, "r1mwaist"), (2013, "r2mwaist"), (2015, "r3mwaist")] CHARLS_data["waist_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #BMI waves = [(2011, "r1mbmi"), (2013, "r2mbmi"), (2015, "r3mbmi")] CHARLS_data["BMI_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #收缩压#舒张压 waves = [(2011, "r1systo"), (2013, "r2systo"), (2015, "r3systo")] CHARLS_data["Systolic_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1diasto"), (2013, "r2diasto"), (2015, "r3diasto")] CHARLS_data["Diastolic_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) # 体力活动 # 2 vigorous (vigorous activity more than once a week) # 1 moderate (moderate activity more than once a week) # 0 inactive (the rest) waves = [(2011, "r1phys"), (2013, "r2phys"), (2015, "r3phys"), (2018, "r4phys")] CHARLS_data["Physical_activity_m"] = pd.concat(merge_data(harmonized, waves, "phys"), ignore_index=True) # 抽烟 # 1 抽过烟 # 0 没有抽过烟 waves = [(2011, "r1smokev"), (2013, "r2smokev"), (2015, "r3smokev"), (2018, "r4smokev")] CHARLS_data["Smoke_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) # 喝酒 # 1 喝过酒 # 0 没有喝过酒 waves = [(2011, "r1drinkev"), (2013, "r2drinkev"), (2015, "r3drinkev"), (2018, "r4drinkev")] CHARLS_data["Drink_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #慢性病 waves = [(2011, "r1hibpe"), (2013, "r2hibpe"), (2015, "r3hibpe"), (2018, "r4hibpe")] CHARLS_data["Hypertension_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1diabe"), (2013, "r2diabe"), (2015, "r3diabe"), (2018, "r4diabe")] CHARLS_data["Disabetes_or_High_Blood_Sugar_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1cancre"), (2013, "r2cancre"), (2015, "r3cancre"), (2018, "r4cancre")] CHARLS_data["Cancer_or_Malignant_Tumor_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1lunge"), (2013, "r2lunge"), (2015, "r3lunge"), (2018, "r4lunge")] CHARLS_data["Chronic_Lung_Diseases_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1hearte"), (2013, "r2hearte"), (2015, "r3hearte"), (2018, "r4hearte")] CHARLS_data["Heart_Problems_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1psyche"), (2013, "r2psyche"), (2015, "r3psyche"), (2018, "r4psyche")] CHARLS_data["Emotional_Nervous_or_Psychiatric_Problems_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1stroke"), (2013, "r2stroke"), (2015, "r3stroke"), (2018, "r4stroke")] CHARLS_data["Stroke_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1arthre"), (2013, "r2arthre"), (2015, "r3arthre"), (2018, "r4arthre")] CHARLS_data["Arthritis_or_Rheumatism_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1dyslipe"), (2013, "r2dyslipe"), (2015, "r3dyslipe"), (2018, "r4dyslipe")] CHARLS_data["Dyslipidemia_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1livere"), (2013, "r2livere"), (2015, "r3livere"), (2018, "r4livere")] CHARLS_data["Liver_Disease_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1kidneye"), (2013, "r2kidneye"), (2015, "r3kidneye"), (2018, "r4kidneye")] CHARLS_data["Kidney_Diease_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1digeste"), (2013, "r2digeste"), (2015, "r3digeste"), (2018, "r4digeste")] CHARLS_data["Stomach_or_Other_Digestive_Disease_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1asthmae"), (2013, "r2asthmae"), (2015, "r3asthmae"), (2018, "r4asthmae")] CHARLS_data["Asthma_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1memrye"), (2013, "r2memrye"), (2015, "r3memrye"), (2018, "r4memrye")] CHARLS_data["Memory_Related_Disease_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #心理评分 waves = [(2011, "s1cesd10"), (2013, "s2cesd10"), (2015, "s3cesd10"), (2018, "s4cesd10")] CHARLS_data["Psychiatric_score_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #睡眠状态 waves = [(2011, "r1sleeprl"), (2013, "r2sleeprl"), (2015, "r3sleeprl"), (2018, "r4sleeprl")] CHARLS_data["sleep_state_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) # ADL waves = [(2011, "s1adlab_c"), (2013, "s2adlab_c"), (2015, "s3adlab_c"), (2018, "s4adlab_c")] CHARLS_data["ADL_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #年龄 waves = [(2011, "r1agey"), (2013, "r2agey"), (2015, "r3agey"), (2018, "r4agey")] CHARLS_data["age_m"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) #计算认知功能得分,分成三部分:电话问卷9分,词语回忆10分、画图1分 waves = [(2011, "r1orient"), (2013, "r2orient"), (2015, "r3orient"), (2018, "r4orient")] CHARLS_data["Date_Naming"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1imrc"), (2013, "r2imrc"), (2015, "r3imrc"), (2018, "r4imrc")] CHARLS_data["Immediate_Word_Recall"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1dlrc"), (2013, "r2dlrc"), (2015, "r3dlrc"), (2018, "r4dlrc")] CHARLS_data["Delayed_Word_Recall"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1ser7"), (2013, "r2ser7"), (2015, "r3ser7"), (2018, "r4ser7")] CHARLS_data["Serial_7"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) waves = [(2011, "r1draw"), (2013, "r2draw"), (2015, "r3draw"), (2018, "r4draw")] CHARLS_data["Drawing_Picture"] = pd.concat(merge_data(harmonized, waves), ignore_index=True) CHARLS_data["Cognition_score_m"] = CHARLS_data["Date_Naming"] + CHARLS_data["Immediate_Word_Recall"] + CHARLS_data["Delayed_Word_Recall"] + CHARLS_data["Serial_7"] + CHARLS_data["Drawing_Picture"] # 整体合并的:性别,出生年,教育 #教育 # 0 below high school # 1 high school # 2 college or above harmonized["raeduc_c"] = harmonized["raeduc_c"].apply(lambda x : 1 if x == 6 or x == 7 else 2 if x in [8, 9, 10] else 0 if x in [1,2,3,4,5] else np.nan) CHARLS_data = pd.merge(CHARLS_data, harmonized[["ID", "ragender", "rabyear", "raeduc_c"]], on='ID', how='left') #合并 merge_list = ["marital_status_m", "Height_m", "Weight_m", "waist_m", "Systolic_m", "Diastolic_m", "Physical_activity_m", "Smoke_m", 'Drink_m', 'Hypertension_m', 'Disabetes_or_High_Blood_Sugar_m', 'Cancer_or_Malignant_Tumor_m', 'Chronic_Lung_Diseases_m', 'Heart_Problems_m', 'Emotional_Nervous_or_Psychiatric_Problems_m', 'Stroke_m', 'Arthritis_or_Rheumatism_m', 'Dyslipidemia_m', 'Liver_Disease_m', 'Kidney_Diease_m', 'Stomach_or_Other_Digestive_Disease_m', 'Asthma_m', 'Memory_Related_Disease_m', 'Psychiatric_score_m', 'sleep_state_m', 'Cognition_score_m', "age_m", "ADL_m"] #先处理身高1~2单位为米,大于3为cm CHARLS_data['Height'] = CHARLS_data['Height'].apply(lambda x: x if 1 <= x <= 2 else (x / 100 if x > 3 else x)) # 遍历 merge_list 列表 for col_m in merge_list: col = col_m.replace('_m', '') # 去掉 '_m' 得到相应的列名 if col in CHARLS_data.columns and col_m in CHARLS_data.columns: CHARLS_data[col] = CHARLS_data[col_m].fillna(CHARLS_data[col]) # 计算BMI CHARLS_data['BMI'] = CHARLS_data['Weight'] /(CHARLS_data['Height'] * CHARLS_data['Height']) CHARLS_data['BMI'] = CHARLS_data["BMI_m"].fillna(CHARLS_data['BMI']) # 处理慢性病标准不一样,将2变为0 chronic_disease = ['Hypertension','Dyslipidemia','Disabetes_or_High_Blood_Sugar','Cancer_or_Malignant_Tumor','Chronic_Lung_Diseases', 'Liver_Disease', 'Heart_Problems', 'Stroke', 'Kidney_Diease','Stomach_or_Other_Digestive_Disease', 'Emotional_Nervous_or_Psychiatric_Problems', 'Memory_Related_Disease','Arthritis_or_Rheumatism','Asthma'] CHARLS_data[chronic_disease] = CHARLS_data[chronic_disease].replace(2, 0) #处理"ragender", "rabyear", "raeduc_c" common_new_list = ["ragender", "rabyear", "raeduc_c"] common_list = ["rgender", "birth_year", "education"] for col_m, col in zip(common_new_list, common_list): if col in CHARLS_data.columns and col_m in CHARLS_data.columns: CHARLS_data[col] = CHARLS_data[col_m].fillna(CHARLS_data[col]) CHARLS_data = CHARLS_data.drop(columns=["Date_Naming", "Immediate_Word_Recall", "Delayed_Word_Recall", "Serial_7", "Drawing_Picture", "BMI_m"] + merge_list+ common_new_list) #处理性别 CHARLS_data = CHARLS_data.groupby('ID').apply(lambda group: fill_gender(group, "rgender")).reset_index(drop=True) #处理出生年月 CHARLS_data = CHARLS_data.groupby('ID').apply(lambda group: fill_gender(group, "birth_year")).reset_index(drop=True) CHARLS_data = CHARLS_data.groupby('ID').apply(lambda group: fill_gender(group, "birth_month")).reset_index(drop=True) #处理教育 CHARLS_data = CHARLS_data.groupby('ID').apply(lambda group: fill_gender(group, "education")).reset_index(drop=True) #重新计算年龄 CHARLS_data["age"] = CHARLS_data.apply(calculate_age, axis=1) CHARLS_data.to_csv("CHARLS_data_pollutants_p_n_m_nd_h.csv", index=False)