1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os.path as op 

7import logging 

8import time 

9try: 

10 import cPickle as pickle 

11except ImportError: 

12 import pickle 

13 

14from pyrocko import util 

15from pyrocko.guts import String, Dict, Duration, dump_all 

16 

17from .base import Source 

18from ..model import ehash 

19from ..lock import LockDir 

20 

21guts_prefix = 'squirrel' 

22 

23logger = logging.getLogger('psq.client.catalog') 

24 

25 

26class Link(object): 

27 def __init__(self, tmin, tmax, tmodified, nevents=-1, content_id=None): 

28 self.tmin = tmin 

29 self.tmax = tmax 

30 self.tmodified = tmodified 

31 self.nevents = nevents 

32 self.content_id = content_id 

33 

34 def __str__(self): 

35 return 'span %s - %s, access %s, nevents %i' % ( 

36 util.tts(self.tmin), 

37 util.tts(self.tmax), 

38 util.tts(self.tmodified), 

39 self.nevents) 

40 

41 

42class NoSuchCatalog(Exception): 

43 pass 

44 

45 

46def get_catalog(name): 

47 if name == 'geofon': 

48 from pyrocko.client.geofon import Geofon 

49 return Geofon() 

50 elif name == 'gcmt': 

51 from pyrocko.client.globalcmt import GlobalCMT 

52 return GlobalCMT() 

53 else: 

54 raise NoSuchCatalog(name) 

55 

56 

57class CatalogSource(Source): 

58 ''' 

59 Squirrel data-source to transparently access online earthquake catalogs. 

60 

61 The catalog source maintains and synchronizes a partial copy of the online 

62 catalog, e.g. of all events above a certain magnitude. The time span for 

63 which the local copy of the catalog should be up-to date is maintained 

64 automatically be Squirrel. Data is loaded and updated in chunks as 

65 needed in a just-in-time fashion. Data validity can optionally expire after 

66 a given period of time and new data can be treated to be preliminary. 

67 In both cases information will be refreshed as needed. 

68 ''' 

69 

70 catalog = String.T( 

71 help='Catalog name.') 

72 

73 query_args = Dict.T( 

74 String.T(), String.T(), 

75 optional=True, 

76 help='Common arguments, which are appended to all queries, e.g. to ' 

77 'constrain location, depth or magnitude ranges.') 

78 

79 expires = Duration.T( 

80 optional=True, 

81 help='Expiration time [s]. Information older than this will be ' 

82 'refreshed, i.e. queried again.') 

83 

84 anxious = Duration.T( 

85 optional=True, 

86 help='Anxiety period [s]. Information will be treated as preliminary ' 

87 'if it was younger than this at the time of its retrieval. ' 

88 'Preliminary information is refreshed on each query relevant ' 

89 'to it.') 

90 

91 cache_path = String.T( 

92 optional=True, 

93 help='Directory path where the partial local copy of the catalog is ' 

94 "kept. By default the Squirrel environment's cache directory is " 

95 'used.') 

96 

97 def __init__(self, catalog, query_args=None, **kwargs): 

98 Source.__init__(self, catalog=catalog, query_args=query_args, **kwargs) 

99 

100 self._hash = self.make_hash() 

101 self._nevents_query_hint = 1000 

102 self._nevents_chunk_hint = 5000 

103 self._tquery = 3600.*24. 

104 self._tquery_limits = (3600., 3600.*24.*365.) 

105 

106 def describe(self): 

107 return 'catalog:%s:%s' % (self.catalog, self.get_hash()) 

108 

109 def setup(self, squirrel, check=True): 

110 self._force_query_age_max = self.anxious 

111 self._catalog = get_catalog(self.catalog) 

112 

113 self._cache_path = op.join( 

114 self.cache_path or squirrel._cache_path, 

115 'catalog', 

116 self.get_hash()) 

117 

118 util.ensuredir(self._cache_path) 

119 

120 with LockDir(self._cache_path): 

121 self._load_chain() 

122 self._add_events_to_squirrel(squirrel) 

123 

124 def make_hash(self): 

125 s = self.catalog 

126 if self.query_args is not None: 

127 s += ','.join( 

128 '%s:%s' % (k, self.query_args[k]) 

129 for k in sorted(self.query_args.keys())) 

130 else: 

131 s += 'noqueryargs' 

132 

133 return ehash(s) 

134 

135 def get_hash(self): 

136 return self._hash 

137 

138 def update_event_inventory(self, squirrel, constraint=None): 

139 

140 with LockDir(self._cache_path): 

141 self._load_chain() 

142 

143 assert constraint is not None 

144 if constraint is not None: 

145 tmin, tmax = constraint.tmin, constraint.tmax 

146 

147 tmin_sq, tmax_sq = squirrel.get_time_span(dummy_limits=False) 

148 

149 if tmin is None: 

150 tmin = tmin_sq 

151 

152 if tmax is None: 

153 tmax = tmax_sq 

154 

155 if tmin is None or tmax is None: 

156 logger.warning( 

157 'Cannot query catalog source "%s" without time ' 

158 'constraint. Could not determine appropriate time ' 

159 'constraint from current data holdings (no data?).' 

160 % self.catalog) 

161 

162 return 

163 

164 if tmin >= tmax: 

165 return 

166 

167 tnow = time.time() 

168 modified = False 

169 

170 if tmax > tnow: 

171 tmax = tnow 

172 

173 if not self._chain: 

174 self._chain = [Link(tmin, tmax, tnow)] 

175 modified = True 

176 else: 

177 if tmin < self._chain[0].tmin: 

178 self._chain[0:0] = [Link(tmin, self._chain[0].tmin, tnow)] 

179 modified = True 

180 if self._chain[-1].tmax < tmax: 

181 self._chain.append(Link(self._chain[-1].tmax, tmax, tnow)) 

182 modified = True 

183 

184 chain = [] 

185 remove = [] 

186 for link in self._chain: 

187 if tmin < link.tmax and link.tmin < tmax \ 

188 and self._outdated(link, tnow): 

189 

190 if link.content_id: 

191 remove.append( 

192 self._get_events_file_path(link.content_id)) 

193 

194 tmin_query = max(link.tmin, tmin) 

195 tmax_query = min(link.tmax, tmax) 

196 

197 if link.tmin < tmin_query: 

198 chain.append(Link(link.tmin, tmin_query, tnow)) 

199 

200 if tmin_query < tmax_query: 

201 for link in self._iquery(tmin_query, tmax_query, tnow): 

202 chain.append(link) 

203 

204 if tmax_query < link.tmax: 

205 chain.append(Link(tmax_query, link.tmax, tnow)) 

206 

207 modified = True 

208 

209 else: 

210 chain.append(link) 

211 

212 if modified: 

213 self._chain = chain 

214 self._dump_chain() 

215 squirrel.remove(remove) 

216 

217 self._add_events_to_squirrel(squirrel) 

218 

219 def _add_events_to_squirrel(self, squirrel): 

220 add = [] 

221 for link in self._chain: 

222 if link.content_id: 

223 add.append(self._get_events_file_path(link.content_id)) 

224 

225 squirrel.add(add, kinds=['event'], format='yaml') 

226 

227 def _iquery(self, tmin, tmax, tmodified): 

228 

229 nwant = self._nevents_query_hint 

230 tlim = self._tquery_limits 

231 

232 t = tmin 

233 tpack_min = tmin 

234 

235 events = [] 

236 while t < tmax: 

237 tmin_query = t 

238 tmax_query = min(t + self._tquery, tmax) 

239 

240 events_new = self._query(tmin_query, tmax_query) 

241 nevents_new = len(events_new) 

242 events.extend(events_new) 

243 while len(events) > int(self._nevents_chunk_hint * 1.5): 

244 tpack_max = events[self._nevents_chunk_hint].time 

245 yield self._pack( 

246 events[:self._nevents_chunk_hint], 

247 tpack_min, tpack_max, tmodified) 

248 

249 tpack_min = tpack_max 

250 events[:self._nevents_query_hint] = [] 

251 

252 t += self._tquery 

253 

254 if tmax_query != tmax: 

255 if nevents_new < 5: 

256 self._tquery *= 10.0 

257 

258 elif not (nwant // 2 < nevents_new < nwant * 2): 

259 self._tquery /= float(nevents_new) / float(nwant) 

260 

261 self._tquery = max(tlim[0], min(self._tquery, tlim[1])) 

262 

263 if self._force_query_age_max is not None: 

264 tsplit = tmodified - self._force_query_age_max 

265 if tpack_min < tsplit < tmax: 

266 events_older = [] 

267 events_newer = [] 

268 for ev in events: 

269 if ev.time < tsplit: 

270 events_older.append(ev) 

271 else: 

272 events_newer.append(ev) 

273 

274 yield self._pack(events_older, tpack_min, tsplit, tmodified) 

275 yield self._pack(events_newer, tsplit, tmax, tmodified) 

276 return 

277 

278 yield self._pack(events, tpack_min, tmax, tmodified) 

279 

280 def _pack(self, events, tmin, tmax, tmodified): 

281 if events: 

282 content_id = ehash( 

283 self.get_hash() + ' %r %r %r' % (tmin, tmax, tmodified)) 

284 path = self._get_events_file_path(content_id) 

285 dump_all(events, filename=path) 

286 else: 

287 content_id = None 

288 

289 return Link(tmin, tmax, tmodified, len(events), content_id) 

290 

291 def query_args_typed(self): 

292 if self.query_args is None: 

293 return {} 

294 else: 

295 type_map = { 

296 'magmin': float, 

297 'magmax': float, 

298 'latmin': float, 

299 'latmax': float, 

300 'lonmin': float, 

301 'lonmax': float, 

302 'depthmin': float, 

303 'depthmax': float} 

304 

305 return dict( 

306 (k, type_map.get(k, str)(v)) 

307 for (k, v) in self.query_args.items()) 

308 

309 def _query(self, tmin, tmax): 

310 logger.info('Querying catalog "%s" for time span %s - %s.' % ( 

311 self.catalog, util.tts(tmin), util.tts(tmax))) 

312 

313 return self._catalog.get_events( 

314 (tmin, tmax), 

315 **self.query_args_typed()) 

316 

317 def _outdated(self, link, tnow): 

318 if link.nevents == -1: 

319 return True 

320 

321 if self._force_query_age_max \ 

322 and link.tmax + self._force_query_age_max > link.tmodified: 

323 

324 return True 

325 

326 if self.expires is not None \ 

327 and link.tmodified < tnow - self.expires: 

328 

329 return True 

330 

331 return False 

332 

333 def _get_events_file_path(self, fhash): 

334 return op.join(self._cache_path, fhash + '.pf') 

335 

336 def _get_chain_file_path(self): 

337 return op.join(self._cache_path, 'chain.pickle') 

338 

339 def _load_chain(self): 

340 path = self._get_chain_file_path() 

341 if op.exists(path): 

342 with open(path, 'rb') as f: 

343 self._chain = pickle.load(f) 

344 else: 

345 self._chain = [] 

346 

347 def _dump_chain(self): 

348 with open(self._get_chain_file_path(), 'wb') as f: 

349 pickle.dump(self._chain, f, protocol=2) 

350 

351 

352__all__ = [ 

353 'CatalogSource' 

354]