Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/dataset/geonames.py: 85%

95 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 08:54 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Access to the cities and population data of 

8`GeoNames <https://www.geonames.org/>`_. 

9 

10The GeoNames geographical database covers all countries and contains over 

11eleven million placenames. 

12 

13This module provides quick access to a subset of GeoNames containing the cities 

14with a population size exceeding 1000. 

15''' 

16 

17import logging 

18import os 

19from collections import namedtuple 

20import os.path as op 

21 

22import numpy as num 

23 

24from pyrocko import util, config 

25from pyrocko import orthodrome as od 

26from pyrocko.util import urlopen 

27 

28logger = logging.getLogger('pyrocko.dataset.geonames') 

29 

30GeoName = namedtuple('GeoName', ''' 

31geonameid 

32name 

33asciiname 

34alt_names 

35lat 

36lon 

37feature_class 

38feature_code 

39country_code 

40alt_country_code 

41admin1_code 

42admin2_code 

43admin3_code 

44admin4_code 

45population 

46elevation 

47dem 

48timezone 

49modification_date'''.split()) 

50 

51GeoName2 = namedtuple('GeoName2', ''' 

52name 

53asciiname 

54lat 

55lon 

56population 

57feature_code 

58'''.split()) 

59 

60base_url = 'https://mirror.pyrocko.org/download.geonames.org/export/dump/' 

61 

62 

63def download_file(fn, dirpath): 

64 url = base_url + '/' + fn 

65 fpath = op.join(dirpath, fn) 

66 logger.info('starting download of %s' % url) 

67 

68 util.ensuredirs(fpath) 

69 f = urlopen(url) 

70 fpath_tmp = fpath + '.%i.temp' % os.getpid() 

71 g = open(fpath_tmp, 'wb') 

72 while True: 

73 data = f.read(1024) 

74 if not data: 

75 break 

76 g.write(data) 

77 

78 g.close() 

79 f.close() 

80 

81 os.rename(fpath_tmp, fpath) 

82 

83 logger.info('finished download of %s' % url) 

84 

85 

86def positive_region(region): 

87 west, east, south, north = [float(x) for x in region] 

88 

89 assert -180. - 360. <= west < 180. 

90 assert -180. < east <= 180. + 360. 

91 assert -90. <= south < 90. 

92 assert -90. < north <= 90. 

93 

94 if east < west: 

95 east += 360. 

96 

97 if west < -180.: 

98 west += 360. 

99 east += 360. 

100 

101 return (west, east, south, north) 

102 

103 

104def ascii_str(u): 

105 return u.encode('ascii', 'replace') 

106 

107 

108def load(zfn, fn, minpop=1000000, region=None): 

109 geonames_dir = config.config().geonames_dir 

110 filepath = op.join(geonames_dir, zfn or fn) 

111 if not os.path.exists(filepath): 

112 download_file(zfn or fn, geonames_dir) 

113 

114 if region: 

115 w, e, s, n = positive_region(region) 

116 

117 if zfn is not None: 

118 import zipfile 

119 z = zipfile.ZipFile(filepath, 'r') 

120 f = z.open(fn, 'r') 

121 else: 

122 z = None 

123 f = open(filepath, 'rb') 

124 

125 for line in f: 

126 t = line.split(b'\t') 

127 pop = int(t[14]) 

128 if minpop <= pop: 

129 lat = float(t[4]) 

130 lon = float(t[5]) 

131 feature_code = str(t[7].decode('ascii')) 

132 if not region or ( 

133 (w <= lon <= e or w <= lon + 360. <= e) 

134 and (s <= lat <= n)): 

135 

136 yield GeoName2( 

137 t[1].decode('utf8'), 

138 str(t[2].decode('utf8').encode('ascii', 'replace') 

139 .decode('ascii')), 

140 lat, lon, pop, feature_code) 

141 

142 f.close() 

143 if z is not None: 

144 z.close() 

145 

146 

147g_cities = {} 

148 

149 

150def load_all_keep(zfn, fn, minpop=1000000, region=None, exclude=()): 

151 if (zfn, fn) not in g_cities: 

152 g_cities[zfn, fn] = list(load(zfn, fn, minpop=0)) 

153 

154 if region: 

155 w, e, s, n = positive_region(region) 

156 return [c for c in g_cities[zfn, fn] 

157 if (minpop <= c.population and 

158 ((w <= c.lon <= e or w <= c.lon + 360. <= e) 

159 and (s <= c.lat <= n)) and 

160 c.feature_code not in exclude)] 

161 

162 else: 

163 return [c for c in g_cities[zfn, fn] if ( 

164 minpop <= c.population and c.feature_code not in exclude)] 

165 

166 

167def get_cities_region(region=None, minpop=0): 

168 return load_all_keep( 

169 'cities1000.zip', 'cities1000.txt', 

170 region=region, 

171 minpop=minpop, 

172 exclude=('PPLX',)) 

173 

174 

175def get_cities_by_name(name): 

176 ''' 

177 Lookup city by name. 

178 

179 The comparison is done case-insensitive. 

180 

181 :param name: 

182 Name of the city to look for. 

183 :type name: 

184 str 

185 

186 :returns: 

187 Zero or more matching city entries. 

188 :rtype: 

189 :py:class:`list` of :py:class:`GeoName2` 

190 ''' 

191 cities = get_cities_region() 

192 candidates = [] 

193 for c in cities: 

194 if c.asciiname.lower() == name.lower(): 

195 candidates.append(c) 

196 

197 return candidates 

198 

199 

200def get_cities(lat, lon, radius, minpop=0): 

201 ''' 

202 Get cities in a given circular area. 

203 

204 :param lat: 

205 Latitude [deg]. 

206 :type lat: 

207 float 

208 

209 :param lon: 

210 Longitude [deg]. 

211 :type lon: 

212 float 

213 

214 :param radius: 

215 Search radius [m]. 

216 :type radius: 

217 float 

218 

219 :param minpop: 

220 Skip entries with population lower than this. 

221 :type minpop: 

222 int 

223 

224 :returns: 

225 Matching city entries. 

226 :rtype: 

227 :py:class:`list` of :py:class:`GeoName2` 

228 ''' 

229 region = od.radius_to_region(lat, lon, radius) 

230 cities = get_cities_region(region, minpop=minpop) 

231 

232 clats = num.array([c.lat for c in cities]) 

233 clons = num.array([c.lon for c in cities]) 

234 

235 dists = od.distance_accurate50m_numpy(lat, lon, clats, clons) 

236 order = num.argsort(dists) 

237 cities_sorted = [cities[i] for i in order if dists[i] < radius] 

238 return cities_sorted