Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/catalog.py: 81%

193 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel online earthquake catalog client. 

8''' 

9 

10import os.path as op 

11import logging 

12import time 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17 

18from pyrocko import util 

19from pyrocko.guts import String, Dict, Duration, dump_all 

20 

21from .base import Source 

22from ..model import ehash 

23from ..lock import LockDir 

24 

25guts_prefix = 'squirrel' 

26 

27logger = logging.getLogger('psq.client.catalog') 

28 

29 

30class Link(object): 

31 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None): 

32 self.tmin = tmin 

33 self.tmax = tmax 

34 self.tmodified = tmodified 

35 self.nevents = nevents 

36 self.content_id = content_id 

37 

38 def __str__(self): 

39 return 'span %s - %s, access %s, nevents %i' % ( 

40 util.tts(self.tmin), 

41 util.tts(self.tmax), 

42 util.tts(self.tmodified), 

43 self.nevents) 

44 

45 

46class NoSuchCatalog(Exception): 

47 pass 

48 

49 

50def get_catalog(name): 

51 if name == 'geofon': 

52 from pyrocko.client.geofon import Geofon 

53 return Geofon() 

54 elif name == 'gcmt': 

55 from pyrocko.client.globalcmt import GlobalCMT 

56 return GlobalCMT() 

57 else: 

58 raise NoSuchCatalog(name) 

59 

60 

61class CatalogSource(Source): 

62 ''' 

63 Squirrel data-source to transparently access online earthquake catalogs. 

64 

65 The catalog source maintains and synchronizes a partial copy of the online 

66 catalog, e.g. of all events above a certain magnitude. The time span for 

67 which the local copy of the catalog should be up-to date is maintained 

68 automatically be Squirrel. Data is loaded and updated in chunks as 

69 needed in a just-in-time fashion. Data validity can optionally expire after 

70 a given period of time and new data can be treated to be preliminary. 

71 In both cases information will be refreshed as needed. 

72 ''' 

73 

74 catalog = String.T( 

75 help='Catalog name.') 

76 

77 query_args = Dict.T( 

78 String.T(), String.T(), 

79 optional=True, 

80 help='Common arguments, which are appended to all queries, e.g. to ' 

81 'constrain location, depth or magnitude ranges.') 

82 

83 expires = Duration.T( 

84 optional=True, 

85 help='Expiration time [s]. Information older than this will be ' 

86 'refreshed, i.e. queried again.') 

87 

88 anxious = Duration.T( 

89 optional=True, 

90 help='Anxiety period [s]. Information will be treated as preliminary ' 

91 'if it was younger than this at the time of its retrieval. ' 

92 'Preliminary information is refreshed on each query relevant ' 

93 'to it.') 

94 

95 cache_path = String.T( 

96 optional=True, 

97 help='Directory path where the partial local copy of the catalog is ' 

98 "kept. By default the Squirrel environment's cache directory is " 

99 'used.') 

100 

101 def __init__(self, catalog, query_args=None, **kwargs): 

102 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs) 

103 

104 self._hash = self.make_hash() 

105 self._nevents_query_hint = 1000 

106 self._nevents_chunk_hint = 5000 

107 self._tquery = 3600.*24. 

108 self._tquery_limits = (3600., 3600.*24.*365.) 

109 

110 def describe(self): 

111 return 'catalog:%s:%s' % (self.catalog, self.get_hash()) 

112 

113 def setup(self, squirrel, check=True): 

114 self._force_query_age_max = self.anxious 

115 self._catalog = get_catalog(self.catalog) 

116 

117 self._cache_path = op.join( 

118 self.cache_path or squirrel._cache_path, 

119 'catalog', 

120 self.get_hash()) 

121 

122 util.ensuredir(self._cache_path) 

123 

124 with LockDir(self._cache_path): 

125 self._load_chain() 

126 self._add_events_to_squirrel(squirrel) 

127 

128 def make_hash(self): 

129 s = self.catalog 

130 if self.query_args is not None: 

131 s += ','.join( 

132 '%s:%s' % (k, self.query_args[k]) 

133 for k in sorted(self.query_args.keys())) 

134 else: 

135 s += 'noqueryargs' 

136 

137 return ehash(s) 

138 

139 def get_hash(self): 

140 return self._hash 

141 

142 def update_event_inventory(self, squirrel, constraint=None): 

143 

144 with LockDir(self._cache_path): 

145 self._load_chain() 

146 

147 assert constraint is not None 

148 if constraint is not None: 

149 tmin, tmax = constraint.tmin, constraint.tmax 

150 

151 tmin_sq, tmax_sq = squirrel.get_time_span(dummy_limits=False) 

152 

153 if tmin is None: 

154 tmin = tmin_sq 

155 

156 if tmax is None: 

157 tmax = tmax_sq 

158 

159 if tmin is None or tmax is None: 

160 logger.warning( 

161 'Cannot query catalog source "%s" without time ' 

162 'constraint. Could not determine appropriate time ' 

163 'constraint from current data holdings (no data?).' 

164 % self.catalog) 

165 

166 return 

167 

168 if tmin >= tmax: 

169 return 

170 

171 tnow = time.time() 

172 modified = False 

173 

174 if tmax > tnow: 

175 tmax = tnow 

176 

177 if not self._chain: 

178 self._chain = [Link(tmin, tmax, tnow)] 

179 modified = True 

180 else: 

181 if tmin < self._chain[0].tmin: 

182 self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)] 

183 modified = True 

184 if self._chain[-1].tmax < tmax: 

185 self._chain.append(Link(self._chain[-1].tmax, tmax, tnow)) 

186 modified = True 

187 

188 chain = [] 

189 remove = [] 

190 for link in self._chain: 

191 if tmin < link.tmax and link.tmin < tmax \ 

192 and self._outdated(link, tnow): 

193 

194 if link.content_id: 

195 remove.append( 

196 self._get_events_file_path(link.content_id)) 

197 

198 tmin_query = max(link.tmin, tmin) 

199 tmax_query = min(link.tmax, tmax) 

200 

201 if link.tmin < tmin_query: 

202 chain.append(Link(link.tmin, tmin_query, tnow)) 

203 

204 if tmin_query < tmax_query: 

205 for link in self._iquery(tmin_query, tmax_query, tnow): 

206 chain.append(link) 

207 

208 if tmax_query < link.tmax: 

209 chain.append(Link(tmax_query, link.tmax, tnow)) 

210 

211 modified = True 

212 

213 else: 

214 chain.append(link) 

215 

216 if modified: 

217 self._chain = chain 

218 self._dump_chain() 

219 squirrel.remove(remove) 

220 

221 self._add_events_to_squirrel(squirrel) 

222 

223 def _add_events_to_squirrel(self, squirrel): 

224 add = [] 

225 for link in self._chain: 

226 if link.content_id: 

227 add.append(self._get_events_file_path(link.content_id)) 

228 

229 squirrel.add(add, kinds=['event'], format='yaml') 

230 

231 def _iquery(self, tmin, tmax, tmodified): 

232 

233 nwant = self._nevents_query_hint 

234 tlim = self._tquery_limits 

235 

236 t = tmin 

237 tpack_min = tmin 

238 

239 events = [] 

240 while t < tmax: 

241 tmin_query = t 

242 tmax_query = min(t + self._tquery, tmax) 

243 

244 events_new = self._query(tmin_query, tmax_query) 

245 nevents_new = len(events_new) 

246 events.extend(events_new) 

247 while len(events) > int(self._nevents_chunk_hint * 1.5): 

248 tpack_max = events[self._nevents_chunk_hint].time 

249 yield self._pack( 

250 events[:self._nevents_chunk_hint], 

251 tpack_min, tpack_max, tmodified) 

252 

253 tpack_min = tpack_max 

254 events[:self._nevents_query_hint] = [] 

255 

256 t += self._tquery 

257 

258 if tmax_query != tmax: 

259 if nevents_new < 5: 

260 self._tquery *= 10.0 

261 

262 elif not (nwant // 2 < nevents_new < nwant * 2): 

263 self._tquery /= float(nevents_new) / float(nwant) 

264 

265 self._tquery = max(tlim[0], min(self._tquery, tlim[1])) 

266 

267 if self._force_query_age_max is not None: 

268 tsplit = tmodified - self._force_query_age_max 

269 if tpack_min < tsplit < tmax: 

270 events_older = [] 

271 events_newer = [] 

272 for ev in events: 

273 if ev.time < tsplit: 

274 events_older.append(ev) 

275 else: 

276 events_newer.append(ev) 

277 

278 yield self._pack(events_older, tpack_min, tsplit, tmodified) 

279 yield self._pack(events_newer, tsplit, tmax, tmodified) 

280 return 

281 

282 yield self._pack(events, tpack_min, tmax, tmodified) 

283 

284 def _pack(self, events, tmin, tmax, tmodified): 

285 if events: 

286 content_id = ehash( 

287 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified)) 

288 path = self._get_events_file_path(content_id) 

289 dump_all(events, filename=path) 

290 else: 

291 content_id = None 

292 

293 return Link(tmin, tmax, tmodified, len(events), content_id) 

294 

295 def query_args_typed(self): 

296 if self.query_args is None: 

297 return {} 

298 else: 

299 type_map = { 

300 'magmin': float, 

301 'magmax': float, 

302 'latmin': float, 

303 'latmax': float, 

304 'lonmin': float, 

305 'lonmax': float, 

306 'depthmin': float, 

307 'depthmax': float} 

308 

309 return dict( 

310 (k, type_map.get(k, str)(v)) 

311 for (k, v) in self.query_args.items()) 

312 

313 def _query(self, tmin, tmax): 

314 logger.info('Querying catalog "%s" for time span %s - %s.' % ( 

315 self.catalog, util.tts(tmin), util.tts(tmax))) 

316 

317 return self._catalog.get_events( 

318 (tmin, tmax), 

319 **self.query_args_typed()) 

320 

321 def _outdated(self, link, tnow): 

322 if link.nevents == -1: 

323 return True 

324 

325 if self._force_query_age_max \ 

326 and link.tmax + self._force_query_age_max > link.tmodified: 

327 

328 return True 

329 

330 if self.expires is not None \ 

331 and link.tmodified < tnow - self.expires: 

332 

333 return True 

334 

335 return False 

336 

337 def _get_events_file_path(self, fhash): 

338 return op.join(self._cache_path, fhash + '.pf') 

339 

340 def _get_chain_file_path(self): 

341 return op.join(self._cache_path, 'chain.pickle') 

342 

343 def _load_chain(self): 

344 path = self._get_chain_file_path() 

345 if op.exists(path): 

346 with open(path, 'rb') as f: 

347 self._chain = pickle.load(f) 

348 else: 

349 self._chain = [] 

350 

351 def _dump_chain(self): 

352 with open(self._get_chain_file_path(), 'wb') as f: 

353 pickle.dump(self._chain, f, protocol=2) 

354 

355 

356__all__ = [ 

357 'CatalogSource' 

358]