1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import logging 

8 

9from pyrocko import util 

10from pyrocko.io.io_common import FileLoadError 

11from pyrocko.progress import progress 

12 

13from .backends import \ 

14 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas 

15 

16from ..model import to_kind_ids 

17 

18backend_modules = [ 

19 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas] 

20 

21 

22logger = logging.getLogger('psq.io') 

23 

24 

25def make_task(*args): 

26 return progress.task(*args, logger=logger) 

27 

28 

29def update_format_providers(): 

30 '''Update global mapping from file format to io backend module.''' 

31 

32 global g_format_providers 

33 g_format_providers = {} 

34 for mod in backend_modules: 

35 for format in mod.provided_formats(): 

36 if format not in g_format_providers: 

37 g_format_providers[format] = [] 

38 

39 g_format_providers[format].append(mod) 

40 

41 

42g_format_providers = {} 

43update_format_providers() 

44 

45 

46class FormatDetectionFailed(FileLoadError): 

47 ''' 

48 Exception raised when file format detection fails. 

49 ''' 

50 

51 def __init__(self, path): 

52 FileLoadError.__init__( 

53 self, 'format detection failed for file: %s' % path) 

54 

55 

56class UnknownFormat(Exception): 

57 ''' 

58 Exception raised when user requests an unknown file format. 

59 ''' 

60 

61 def __init__(self, format): 

62 Exception.__init__( 

63 self, 'unknown format: %s' % format) 

64 

65 

66def get_backend(fmt): 

67 ''' 

68 Get squirrel io backend module for a given file format. 

69 

70 :param fmt: 

71 Format identifier. 

72 :type fmt: 

73 str 

74 ''' 

75 

76 try: 

77 return g_format_providers[fmt][0] 

78 except KeyError: 

79 raise UnknownFormat(fmt) 

80 

81 

82def detect_format(path): 

83 ''' 

84 Determine file type from first 512 bytes. 

85 

86 :param path: 

87 Path to file. 

88 :type path: 

89 str 

90 ''' 

91 

92 if path.startswith('virtual:'): 

93 return 'virtual' 

94 

95 try: 

96 with open(path, 'rb') as f: 

97 data = f.read(512) 

98 

99 except (OSError, IOError): 

100 raise FormatDetectionFailed(path) 

101 

102 fmt = None 

103 for mod in backend_modules: 

104 fmt = mod.detect(data) 

105 if fmt is not None: 

106 return fmt 

107 

108 raise FormatDetectionFailed(path) 

109 

110 

111def supported_formats(): 

112 ''' 

113 Get list of file formats supported by Squirrel. 

114 ''' 

115 return sorted(g_format_providers.keys()) 

116 

117 

118g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event'] 

119 

120 

121def supported_content_kinds(): 

122 ''' 

123 Get list of supported content kinds offered through Squirrel. 

124 ''' 

125 return g_content_kinds + ['waveform_promise'] 

126 

127 

128def iload( 

129 paths, 

130 segment=None, 

131 format='detect', 

132 database=None, 

133 check=True, 

134 skip_unchanged=False, 

135 content=g_content_kinds, 

136 show_progress=True, 

137 update_selection=None): 

138 

139 ''' 

140 Iteratively load content or index/reindex meta-information from files. 

141 

142 :param paths: 

143 Iterator yielding file names to load from or a Squirrel selection 

144 object providing the file names. 

145 :type paths: 

146 iterator yielding :py:class:`str` or 

147 :py:class:`~pyrocko.squirrel.selection.Selection` 

148 

149 :param segment: 

150 File-specific segment identifier (can only be used when loading from a 

151 single file). 

152 :type segment: 

153 int 

154 

155 :param format: 

156 File format identifier or ``'detect'`` for autodetection. When loading 

157 from a selection, per-file format assignation is taken from the hint in 

158 the selection and this flag is ignored. 

159 :type format: 

160 str 

161 

162 :param database: 

163 Database to use for meta-information caching. When loading from a 

164 selection, this should be ``None`` and the database from the selection 

165 is used. 

166 :type database: 

167 :py:class:`~pyrocko.squirrel.database.Database` 

168 

169 :param check: 

170 If ``True``, investigate modification time and file sizes of known 

171 files to debunk modified files (pessimistic mode), or ``False`` to 

172 deactivate checks (optimistic mode). 

173 :type check: 

174 bool 

175 

176 :param skip_unchanged: 

177 If ``True``, only yield index nuts for new / modified files. 

178 :type skip_unchanged: 

179 bool 

180 

181 :param content: 

182 Selection of content types to load. 

183 :type content: 

184 :py:class:`list` of :py:class:`str` 

185 

186 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for 

187 individual pieces of information found when reading the given files. Such a 

188 nut may represent a waveform, a station, a channel, an event or other data 

189 type. The nut itself only contains the meta-information. The actual content 

190 information is attached to the nut if requested. All nut meta-information 

191 is stored in the squirrel meta-information database. If possible, this 

192 function avoids accessing the actual disk files and provides the requested 

193 information straight from the database. Modified files are recognized and 

194 reindexed as needed. 

195 ''' 

196 

197 from ..selection import Selection 

198 

199 n_db = 0 

200 n_load = 0 

201 selection = None 

202 kind_ids = to_kind_ids(content) 

203 

204 if isinstance(paths, str): 

205 paths = [paths] 

206 else: 

207 if segment is not None: 

208 raise TypeError( 

209 'iload: segment argument can only be used when loading from ' 

210 'a single file') 

211 

212 if isinstance(paths, Selection): 

213 selection = paths 

214 if database is not None: 

215 raise TypeError( 

216 'iload: database argument must be None when called with a ' 

217 'selection') 

218 

219 database = selection.get_database() 

220 

221 if skip_unchanged and not isinstance(paths, Selection): 

222 raise TypeError( 

223 'iload: need selection when called with "skip_unchanged=True"') 

224 

225 temp_selection = None 

226 transaction = None 

227 if database: 

228 if not selection: 

229 # Avoid creating temporary selection for small batches. 

230 # this is helpful because then, we can avoid locking the database, 

231 # e.g. during loading of content, when the content has not been 

232 # modified. 

233 paths = util.short_to_list(100, paths) 

234 if isinstance(paths, list) and len(paths) == 0: 

235 return 

236 

237 if not (isinstance(paths, list) and len(paths) < 100 

238 and not skip_unchanged): 

239 

240 temp_selection = database.new_selection( 

241 paths, show_progress=show_progress, format=format) 

242 

243 selection = temp_selection 

244 

245 if skip_unchanged: 

246 selection.flag_modified(check) 

247 

248 if selection: 

249 # undig_grouped starts a long select which causes deadlocks 

250 # when transaction is started after starting the select, therefore 

251 # the transaction has to be started before in these cases. 

252 # The db will be locked for a long time in this case. This could be 

253 # solved either by breaking the indexing into smaller blocks in 

254 # the caller or by modifying undig_grouped to allow limit and 

255 # offset and add an outer loop below. 

256 transaction = database.transaction( 

257 'update content index') 

258 transaction.begin() 

259 it = selection.undig_grouped(skip_unchanged=skip_unchanged) 

260 else: 

261 # The list() causes the query to finish, so we don't have to lock, 

262 # and can start a transaction only when encountering a modified/new 

263 # file. 

264 it = list(database.undig_few(paths, format=format)) 

265 

266 else: 

267 it = (((format, path), []) for path in paths) 

268 

269 it = util.short_to_list(100, iter(it)) 

270 

271 try: 

272 n_files_total = len(it) 

273 if n_files_total == 0: 

274 if transaction: 

275 transaction.commit() 

276 transaction.close() 

277 return 

278 

279 except TypeError: 

280 n_files_total = None 

281 

282 task = None 

283 if show_progress: 

284 if not kind_ids: 

285 task = make_task('Indexing files', n_files_total) 

286 else: 

287 task = make_task('Loading files', n_files_total) 

288 

289 n_files = 0 

290 tcommit = time.time() 

291 

292 clean = False 

293 try: 

294 for (format, path), old_nuts in it: 

295 if task is not None: 

296 condition = '(nuts: %i from file, %i from cache)\n %s' % ( 

297 n_load, n_db, path) 

298 task.update(n_files, condition) 

299 

300 n_files += 1 

301 # cannot release when iterating a selection (see above) 

302 if database and transaction and not selection: 

303 tnow = time.time() 

304 if tnow - tcommit > 20. or n_files % 1000 == 0: 

305 transaction.commit() 

306 tcommit = tnow 

307 transaction.begin() 

308 

309 try: 

310 if check and old_nuts and old_nuts[0].file_modified(): 

311 old_nuts = [] 

312 modified = True 

313 else: 

314 modified = False 

315 

316 if segment is not None: 

317 old_nuts = [ 

318 nut for nut in old_nuts if nut.file_segment == segment] 

319 

320 if old_nuts: 

321 db_only_operation = not kind_ids or all( 

322 nut.kind_id in kind_ids and nut.content_in_db 

323 for nut in old_nuts) 

324 

325 if db_only_operation: 

326 # logger.debug('using cached information for file %s, ' 

327 # % path) 

328 

329 for nut in old_nuts: 

330 if nut.kind_id in kind_ids: 

331 database.undig_content(nut) 

332 

333 n_db += 1 

334 yield nut 

335 

336 continue 

337 

338 if format == 'detect': 

339 if old_nuts and not old_nuts[0].file_modified(): 

340 format_this = old_nuts[0].file_format 

341 else: 

342 format_this = detect_format(path) 

343 else: 

344 format_this = format 

345 

346 mod = get_backend(format_this) 

347 mtime, size = mod.get_stats(path) 

348 

349 if segment is not None: 

350 logger.debug( 

351 'Reading file "%s", segment "%s".' % (path, segment)) 

352 else: 

353 logger.debug( 

354 'Reading file "%s".' % path) 

355 

356 nuts = [] 

357 for nut in mod.iload(format_this, path, segment, content): 

358 nut.file_path = path 

359 nut.file_format = format_this 

360 nut.file_mtime = mtime 

361 nut.file_size = size 

362 if nut.content is not None: 

363 nut.content._squirrel_key = nut.key 

364 

365 nuts.append(nut) 

366 n_load += 1 

367 yield nut 

368 

369 if database and nuts != old_nuts: 

370 if old_nuts or modified: 

371 logger.debug( 

372 'File has been modified since last access: %s' 

373 % path) 

374 

375 if segment is not None: 

376 nuts = list(mod.iload(format_this, path, None, [])) 

377 for nut in nuts: 

378 nut.file_path = path 

379 nut.file_format = format_this 

380 nut.file_mtime = mtime 

381 nut.file_size = size 

382 

383 if not transaction: 

384 transaction = database.transaction( 

385 'update content index') 

386 transaction.begin() 

387 

388 database.dig(nuts, transaction=transaction) 

389 if update_selection is not None: 

390 update_selection._set_file_states_force_check( 

391 [path], transaction=transaction) 

392 update_selection._update_nuts(transaction=transaction) 

393 

394 except FileLoadError: 

395 logger.error('Cannot read file: %s' % path) 

396 if database: 

397 if not transaction: 

398 transaction = database.transaction( 

399 'update content index') 

400 transaction.begin() 

401 database.reset(path, transaction=transaction) 

402 

403 clean = True 

404 

405 finally: 

406 if task is not None: 

407 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db) 

408 task.update(n_files, condition) 

409 if clean: 

410 task.done(condition) 

411 else: 

412 task.fail(condition + ' terminated') 

413 

414 if database and transaction: 

415 transaction.commit() 

416 transaction.close() 

417 

418 if temp_selection: 

419 del temp_selection 

420 

421 

422__all__ = [ 

423 'iload', 

424 'detect_format', 

425 'supported_formats', 

426 'supported_content_kinds', 

427 'get_backend', 

428 'FormatDetectionFailed', 

429 'UnknownFormat', 

430]