Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/client/catalog.py: 79%

208 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel online earthquake catalog client. 

8''' 

9 

10import os.path as op 

11import logging 

12import time 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17 

18from pyrocko import util, progress 

19from pyrocko.guts import String, Dict, Duration, dump_all 

20 

21from .base import Source 

22from ..model import ehash, g_tmin, g_tmax 

23from ..lock import LockDir 

24 

25guts_prefix = 'squirrel' 

26 

27logger = logging.getLogger('psq.client.catalog') 

28 

29 

30class Link(object): 

31 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None): 

32 self.tmin = tmin 

33 self.tmax = tmax 

34 self.tmodified = tmodified 

35 self.nevents = nevents 

36 self.content_id = content_id 

37 

38 def __str__(self): 

39 return 'span %s - %s, access %s, nevents %i' % ( 

40 util.tts(self.tmin), 

41 util.tts(self.tmax), 

42 util.tts(self.tmodified), 

43 self.nevents) 

44 

45 

46class NoSuchCatalog(Exception): 

47 pass 

48 

49 

50def get_catalog(name): 

51 if name == 'geofon': 

52 from pyrocko.client.geofon import Geofon 

53 return Geofon() 

54 elif name == 'gcmt': 

55 from pyrocko.client.globalcmt import GlobalCMT 

56 return GlobalCMT() 

57 elif name == 'isc': 

58 from pyrocko.client.isc import ISC 

59 return ISC() 

60 else: 

61 raise NoSuchCatalog(name) 

62 

63 

64class CatalogSource(Source): 

65 ''' 

66 Squirrel data-source to transparently access online earthquake catalogs. 

67 

68 The catalog source maintains and synchronizes a partial copy of the online 

69 catalog, e.g. of all events above a certain magnitude. The time span for 

70 which the local copy of the catalog should be up-to date is maintained 

71 automatically be Squirrel. Data is loaded and updated in chunks as 

72 needed in a just-in-time fashion. Data validity can optionally expire after 

73 a given period of time and new data can be treated to be preliminary. 

74 In both cases information will be refreshed as needed. 

75 ''' 

76 

77 catalog = String.T( 

78 help='Catalog name.') 

79 

80 query_args = Dict.T( 

81 String.T(), String.T(), 

82 optional=True, 

83 help='Common arguments, which are appended to all queries, e.g. to ' 

84 'constrain location, depth or magnitude ranges.') 

85 

86 expires = Duration.T( 

87 optional=True, 

88 help='Expiration time [s]. Information older than this will be ' 

89 'refreshed, i.e. queried again.') 

90 

91 anxious = Duration.T( 

92 optional=True, 

93 help='Anxiety period [s]. Information will be treated as preliminary ' 

94 'if it was younger than this at the time of its retrieval. ' 

95 'Preliminary information is refreshed on each query relevant ' 

96 'to it.') 

97 

98 cache_path = String.T( 

99 optional=True, 

100 help='Directory path where the partial local copy of the catalog is ' 

101 "kept. By default the Squirrel environment's cache directory is " 

102 'used.') 

103 

104 def __init__(self, catalog, query_args=None, **kwargs): 

105 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs) 

106 

107 self._hash = self.make_hash() 

108 self._nevents_query_hint = 1000 

109 self._nevents_chunk_hint = 5000 

110 self._tquery = 3600.*24. 

111 self._tquery_limits = (3600., 3600.*24.*365.) 

112 self._cache_path = None 

113 

114 def get_cache_path(self): 

115 return self._cache_path 

116 

117 def describe(self): 

118 return 'catalog:%s:%s' % (self.catalog, self.get_hash()) 

119 

120 def setup(self, squirrel, check=True, upgrade=False): 

121 self._force_query_age_max = self.anxious 

122 self._catalog = get_catalog(self.catalog) 

123 

124 self._cache_path = op.join( 

125 self.cache_path or squirrel._cache_path, 

126 'catalog', 

127 self.get_hash()) 

128 

129 util.ensuredir(self._cache_path) 

130 

131 with LockDir(self._cache_path): 

132 self._load_chain() 

133 self._add_events_to_squirrel(squirrel) 

134 

135 def make_hash(self): 

136 s = self.catalog 

137 if self.query_args is not None: 

138 s += ','.join( 

139 '%s:%s' % (k, self.query_args[k]) 

140 for k in sorted(self.query_args.keys())) 

141 else: 

142 s += 'noqueryargs' 

143 

144 return ehash(s) 

145 

146 def get_hash(self): 

147 return self._hash 

148 

149 def update_event_inventory(self, squirrel, constraint=None): 

150 

151 with LockDir(self._cache_path): 

152 self._load_chain() 

153 

154 assert constraint is not None 

155 if constraint is not None: 

156 tmin, tmax = constraint.tmin, constraint.tmax 

157 

158 tmin_sq, tmax_sq = squirrel.get_time_span(dummy_limits=False) 

159 

160 if tmin is None: 

161 tmin = tmin_sq 

162 

163 if tmax is None: 

164 tmax = tmax_sq 

165 

166 if tmin == g_tmin: 

167 tmin = None 

168 

169 if tmax == g_tmax: 

170 tmax = None 

171 

172 if tmin is None or tmax is None: 

173 logger.warning( 

174 'Cannot query catalog source "%s" without time ' 

175 'constraint. Could not determine appropriate time ' 

176 'constraint from current data holdings (no data?).' 

177 % self.catalog) 

178 

179 return 

180 

181 if tmin >= tmax: 

182 tmax = tmin 

183 

184 tnow = time.time() 

185 modified = False 

186 

187 if tmax > tnow: 

188 tmax = tnow 

189 

190 chain = [] 

191 this_tmin = tmin 

192 for link in self._chain: 

193 if this_tmin < link.tmin and tmax > this_tmin: 

194 chain.append(Link(this_tmin, min(tmax, link.tmin), tnow)) 

195 modified = True 

196 

197 chain.append(link) 

198 this_tmin = link.tmax 

199 

200 if this_tmin < tmax: 

201 chain.append(Link(this_tmin, tmax, tnow)) 

202 modified = True 

203 

204 if modified: 

205 self._chain = chain 

206 

207 chain = [] 

208 remove = [] 

209 for link in self._chain: 

210 if tmin < link.tmax and link.tmin < tmax \ 

211 and self._outdated(link, tnow): 

212 

213 if link.content_id: 

214 remove.append( 

215 self._get_events_file_path(link.content_id)) 

216 

217 tmin_query = max(link.tmin, tmin) 

218 tmax_query = min(link.tmax, tmax) 

219 

220 if link.tmin < tmin_query: 

221 chain.append(Link(link.tmin, tmin_query, tnow)) 

222 

223 if tmin_query < tmax_query: 

224 for link in self._iquery(tmin_query, tmax_query, tnow): 

225 chain.append(link) 

226 

227 if tmax_query < link.tmax: 

228 chain.append(Link(tmax_query, link.tmax, tnow)) 

229 

230 modified = True 

231 

232 else: 

233 chain.append(link) 

234 

235 if modified: 

236 self._chain = chain 

237 self._dump_chain() 

238 squirrel.remove(remove) 

239 

240 self._add_events_to_squirrel(squirrel) 

241 

242 def _add_events_to_squirrel(self, squirrel): 

243 add = [] 

244 for link in self._chain: 

245 if link.content_id: 

246 add.append(self._get_events_file_path(link.content_id)) 

247 

248 squirrel.add(add, kinds=['event'], format='yaml') 

249 

250 def _iquery(self, tmin, tmax, tmodified): 

251 

252 nwant = self._nevents_query_hint 

253 tlim = self._tquery_limits 

254 

255 t = tmin 

256 tpack_min = tmin 

257 

258 events = [] 

259 with progress.task( 

260 'Querying %s' % self.catalog, 100, logger=logger) as task: 

261 

262 while t < tmax: 

263 tmin_query = t 

264 tmax_query = min(t + self._tquery, tmax) 

265 

266 events_new = self._query(tmin_query, tmax_query) 

267 nevents_new = len(events_new) 

268 events.extend(events_new) 

269 while len(events) > int(self._nevents_chunk_hint * 1.5): 

270 tpack_max = events[self._nevents_chunk_hint].time 

271 yield self._pack( 

272 events[:self._nevents_chunk_hint], 

273 tpack_min, tpack_max, tmodified) 

274 

275 tpack_min = tpack_max 

276 events[:self._nevents_query_hint] = [] 

277 

278 t += self._tquery 

279 

280 if tmax_query != tmax: 

281 if nevents_new < 5: 

282 self._tquery *= 10.0 

283 

284 elif not (nwant // 2 < nevents_new < nwant * 2): 

285 self._tquery /= float(nevents_new) / float(nwant) 

286 

287 self._tquery = max(tlim[0], min(self._tquery, tlim[1])) 

288 task.update(int(round(100. * (t-tmin)/(tmax-tmin)))) 

289 

290 if self._force_query_age_max is not None: 

291 tsplit = tmodified - self._force_query_age_max 

292 if tpack_min < tsplit < tmax: 

293 events_older = [] 

294 events_newer = [] 

295 for ev in events: 

296 if ev.time < tsplit: 

297 events_older.append(ev) 

298 else: 

299 events_newer.append(ev) 

300 

301 yield self._pack( 

302 events_older, tpack_min, tsplit, tmodified) 

303 yield self._pack( 

304 events_newer, tsplit, tmax, tmodified) 

305 return 

306 

307 yield self._pack(events, tpack_min, tmax, tmodified) 

308 

309 def _pack(self, events, tmin, tmax, tmodified): 

310 if events: 

311 content_id = ehash( 

312 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified)) 

313 path = self._get_events_file_path(content_id) 

314 dump_all(events, filename=path) 

315 else: 

316 content_id = None 

317 

318 return Link(tmin, tmax, tmodified, len(events), content_id) 

319 

320 def query_args_typed(self): 

321 if self.query_args is None: 

322 return {} 

323 else: 

324 type_map = { 

325 'magmin': float, 

326 'magmax': float, 

327 'latmin': float, 

328 'latmax': float, 

329 'lonmin': float, 

330 'lonmax': float, 

331 'depthmin': float, 

332 'depthmax': float} 

333 

334 return dict( 

335 (k, type_map.get(k, str)(v)) 

336 for (k, v) in self.query_args.items()) 

337 

338 def _query(self, tmin, tmax): 

339 logger.info('Querying catalog "%s" for time span %s - %s.' % ( 

340 self.catalog, util.tts(tmin), util.tts(tmax))) 

341 

342 return self._catalog.get_events( 

343 (tmin, tmax), 

344 **self.query_args_typed()) 

345 

346 def _outdated(self, link, tnow): 

347 if link.nevents == -1: 

348 return True 

349 

350 if self._force_query_age_max \ 

351 and link.tmax + self._force_query_age_max > link.tmodified: 

352 

353 return True 

354 

355 if self.expires is not None \ 

356 and link.tmodified < tnow - self.expires: 

357 

358 return True 

359 

360 return False 

361 

362 def _get_events_file_path(self, fhash): 

363 return op.join(self._cache_path, fhash + '.pf') 

364 

365 def _get_chain_file_path(self): 

366 return op.join(self._cache_path, 'chain.pickle') 

367 

368 def _load_chain(self): 

369 path = self._get_chain_file_path() 

370 if op.exists(path): 

371 with open(path, 'rb') as f: 

372 self._chain = pickle.load(f) 

373 else: 

374 self._chain = [] 

375 

376 def _dump_chain(self): 

377 with open(self._get_chain_file_path(), 'wb') as f: 

378 pickle.dump(self._chain, f, protocol=2) 

379 

380 

381__all__ = [ 

382 'CatalogSource' 

383]