nc2geotiff.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #;+
  2. #; :Author: Dr. Jing Wei (Email: weijing_rs@163.com)
  3. #;-
  4. import os
  5. from time import sleep
  6. # import gdal
  7. import netCDF4 as nc
  8. import numpy as np
  9. from glob import glob
  10. import requests
  11. import json
  12. import pandas as pd
  13. import concurrent.futures
  14. # from osgeo import osr
  15. #Define work and output paths
  16. WorkPath = r'/root/r_base/O3'
  17. OutPath = WorkPath
  18. #Define air pollutant type
  19. #e.g., PM1, PM2.5, PM10, O3, NO2, SO2, and CO, et al.
  20. AP = 'O3'
  21. #Define spatial resolution
  22. #e.g., 1 km ≈ 0.01 Degree
  23. SP = 0.01 #Degrees
  24. if not os.path.exists(OutPath):
  25. os.makedirs(OutPath)
  26. path = glob(os.path.join(WorkPath, '*.nc'))
  27. #线程运行函数
  28. def thread_work(vll_work, start, end):
  29. result_work_array = []
  30. for value_work, lat_ind_work, lon_ind_work in vll_work:
  31. params = {
  32. "lng": "{:.6f}".format(lon[lon_ind_work]),
  33. "lat": "{:.6f}".format(lat[lat_ind_work])
  34. }
  35. flag = True
  36. while(flag):
  37. try:
  38. response_work = requests.get(url="http://103.116.120.27:9527/queryPoint",params = params)
  39. flag = False
  40. except Exception as e:
  41. print(f"请求错误一次:{e}")
  42. sleep(10)
  43. pass
  44. res_json_work = json.loads(response_work.text)
  45. #坐标在国内
  46. list_local_work = res_json_work['v']['list']
  47. if len(list_local_work) > 0:
  48. try:
  49. if len(list_local_work) == 1:
  50. province_city_work = [local_work['ext_path'] for local_work in list_local_work if local_work['deep'] == '0']
  51. tmp_result_work = [province_city_work[0], province_city_work[0], value_work]
  52. result_work_array.append(tmp_result_work)
  53. else:
  54. province_city_work = [local_work['ext_path'] for local_work in list_local_work if local_work['deep'] == '1']
  55. tmp_result_work = [province_city_work[0].split(" ")[0], province_city_work[0].split(" ")[1], value_work]
  56. result_work_array.append(tmp_result_work)
  57. except Exception as e:
  58. print("发生成异常"+json.dumps(list_local_work))
  59. else:
  60. print(f"这是一个空的坐标:{lon[lon_ind_work]},{lat[lat_ind_work]}\n")
  61. # if len(result_work_array) % 100 == 0 :
  62. # print(f"当前线程处理开始{start},结束{end}, 已经处理的个数为{len(result_work_array)}\n")
  63. return result_work_array
  64. for file in path:
  65. #全部年份的
  66. file_path = "result.csv"
  67. #提取出来年份
  68. year = file.split("_")[4]
  69. print(f"当前处理年份{year}")
  70. f = nc.Dataset(file)
  71. #Read SDS data
  72. data = np.array(f[AP][:])
  73. #Define missing value: NaN or -999
  74. data[data==65535] = np.nan #-999
  75. #Read longitude and latitude information
  76. lon = np.array(f['lon'][:])
  77. lat = np.array(f['lat'][:])
  78. #获取非空索引
  79. indices = np.where(~np.isnan(data))
  80. #获取非空值
  81. values = data[indices]
  82. #拼接(value, lat, lon)
  83. vll = list(zip(values, indices[0], indices[1]))
  84. #继续索引记录文件地址
  85. index_path = "index_"+year+".txt"
  86. # 尝试以读取模式打开文件
  87. try:
  88. with open(index_path, 'r') as file:
  89. # 如果文件存在,读取文件内容
  90. index = file.readline()
  91. # 如果文件不存在,则新建文件并写入内容
  92. except FileNotFoundError:
  93. index = 0
  94. vll = vll[int(index):]
  95. #将一年的数据拆分成多大一块
  96. batch_size = 100000
  97. total_len = len(vll)
  98. if total_len == 0:
  99. continue
  100. batch_start = 0
  101. # 多少个线程
  102. max_workers = 10
  103. with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:
  104. for i in range(total_len // batch_size + 1):
  105. # 尝试以读取模式打开文件
  106. try:
  107. # 如果文件存在,读取文件内容
  108. result_all = pd.read_csv(file_path)
  109. # 如果文件不存在,则新建文件并写入内容
  110. except FileNotFoundError:
  111. result_all = []
  112. batch_end = min(batch_start + batch_size, total_len)
  113. vll_one = vll[batch_start:batch_end]
  114. batch_start = batch_end
  115. result_array = []
  116. #并行调用接口获取坐标对应城市
  117. start = 0
  118. avg = len(vll_one)//max_workers
  119. remainder = len(vll_one) % max_workers
  120. all_task = []
  121. for i in range(max_workers):
  122. if i < remainder:
  123. end = start + avg + 1
  124. else:
  125. end = start + avg
  126. all_task.append(executor.submit(thread_work, vll_one[start:end], start, end))
  127. start = end
  128. for future in concurrent.futures.as_completed(all_task):
  129. data = future.result()
  130. result_array = result_array + data
  131. #相同地区求平均
  132. columns = ['province', 'city', year]
  133. result_df = pd.DataFrame(result_array, columns=columns)
  134. if len(result_all) == 0 :
  135. result_all = result_df.groupby(['province', 'city']).mean().reset_index()
  136. else:
  137. result_one = result_df.groupby(['province', 'city']).mean().reset_index()
  138. #合并
  139. if year in result_all.columns:
  140. print("============新加的数据================")
  141. print(result_one)
  142. concatenated_df = pd.concat([result_all[['province', 'city', year]], result_one])
  143. # 使用 groupby 进行聚合
  144. grouped_df = concatenated_df.groupby(['province', 'city']).mean().reset_index()
  145. result_all = pd.merge(result_all, grouped_df, on=['province', 'city'], how='outer', suffixes=('', '_total'))
  146. #替换掉
  147. result_all[year] = result_all[year+"_total"]
  148. result_all = result_all.drop([year+"_total"], axis=1)
  149. else:
  150. result_all = pd.merge(result_all, result_one, on=['province', 'city'], how="outer")
  151. print("============合并后的数据================")
  152. print(result_all.head())
  153. result_all.to_csv("result.csv",index=False)
  154. with open(index_path, 'w') as file:
  155. # 如果文件存在,读取文件内容
  156. file.write(str(batch_end+int(index)))
  157. # LonMin,LatMax,LonMax,LatMin = lon.min(),lat.max(),lon.max(),lat.min()
  158. # N_Lat = len(lat)
  159. # N_Lon = len(lon)
  160. # Lon_Res = SP #round((LonMax-LonMin)/(float(N_Lon)-1),2)
  161. # Lat_Res = SP #round((LatMax-LatMin)/(float(N_Lat)-1),2)
  162. # #Define Define output file
  163. # fname = os.path.basename(file).split('.nc')[0]
  164. # outfile = OutPath + '/{}.tif' .format(fname)
  165. #Write GeoTIFF
  166. # driver = gdal.GetDriverByName('GTiff')
  167. # outRaster = driver.Create(outfile,N_Lon,N_Lat,1,gdal.GDT_Float32)
  168. # outRaster.SetGeoTransform([LonMin-Lon_Res/2,Lon_Res,0,LatMax+Lat_Res/2,0,-Lat_Res])
  169. # sr = osr.SpatialReference()
  170. # sr.SetWellKnownGeogCS('WGS84')
  171. # outRaster.SetProjection(sr.ExportToWkt())
  172. # outRaster.GetRasterBand(1).WriteArray(data)
  173. # print(fname+'.tif',' Finished')
  174. # #release memory
  175. # del outRaster
  176. f.close()
  177. # result_all.reset_index(inplace=True)
  178. # print(result_all.head())
  179. # result_all.to_csv("result.csv",index=False)