1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import time 

9import logging 

10from builtins import str as newstr 

11 

12from pyrocko import util 

13from pyrocko.io.io_common import FileLoadError 

14from pyrocko.progress import progress 

15 

16from .backends import \ 

17 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas 

18 

19from ..model import to_kind_ids 

20 

21backend_modules = [ 

22 mseed, sac, datacube, stationxml, textfiles, virtual, yaml, tdms_idas] 

23 

24 

25logger = logging.getLogger('psq.io') 

26 

27 

28def make_task(*args): 

29 return progress.task(*args, logger=logger) 

30 

31 

32def update_format_providers(): 

33 '''Update global mapping from file format to io backend module.''' 

34 

35 global g_format_providers 

36 g_format_providers = {} 

37 for mod in backend_modules: 

38 for format in mod.provided_formats(): 

39 if format not in g_format_providers: 

40 g_format_providers[format] = [] 

41 

42 g_format_providers[format].append(mod) 

43 

44 

45g_format_providers = {} 

46update_format_providers() 

47 

48 

49class FormatDetectionFailed(FileLoadError): 

50 ''' 

51 Exception raised when file format detection fails. 

52 ''' 

53 

54 def __init__(self, path): 

55 FileLoadError.__init__( 

56 self, 'format detection failed for file: %s' % path) 

57 

58 

59class UnknownFormat(Exception): 

60 ''' 

61 Exception raised when user requests an unknown file format. 

62 ''' 

63 

64 def __init__(self, format): 

65 Exception.__init__( 

66 self, 'unknown format: %s' % format) 

67 

68 

69def get_backend(fmt): 

70 ''' 

71 Get squirrel io backend module for a given file format. 

72 

73 :param fmt: 

74 Format identifier. 

75 :type fmt: 

76 str 

77 ''' 

78 

79 try: 

80 return g_format_providers[fmt][0] 

81 except KeyError: 

82 raise UnknownFormat(fmt) 

83 

84 

85def detect_format(path): 

86 ''' 

87 Determine file type from first 512 bytes. 

88 

89 :param path: 

90 Path to file. 

91 :type path: 

92 str 

93 ''' 

94 

95 if path.startswith('virtual:'): 

96 return 'virtual' 

97 

98 try: 

99 with open(path, 'rb') as f: 

100 data = f.read(512) 

101 

102 except (OSError, IOError): 

103 raise FormatDetectionFailed(path) 

104 

105 fmt = None 

106 for mod in backend_modules: 

107 fmt = mod.detect(data) 

108 if fmt is not None: 

109 return fmt 

110 

111 raise FormatDetectionFailed(path) 

112 

113 

114def supported_formats(): 

115 ''' 

116 Get list of file formats supported by Squirrel. 

117 ''' 

118 return sorted(g_format_providers.keys()) 

119 

120 

121g_content_kinds = ['waveform', 'station', 'channel', 'response', 'event'] 

122 

123 

124def supported_content_kinds(): 

125 ''' 

126 Get list of supported content kinds offered through Squirrel. 

127 ''' 

128 return g_content_kinds 

129 

130 

131def iload( 

132 paths, 

133 segment=None, 

134 format='detect', 

135 database=None, 

136 check=True, 

137 skip_unchanged=False, 

138 content=g_content_kinds, 

139 show_progress=True, 

140 update_selection=None): 

141 

142 ''' 

143 Iteratively load content or index/reindex meta-information from files. 

144 

145 :param paths: 

146 Iterator yielding file names to load from or a Squirrel selection 

147 object providing the file names. 

148 :type paths: 

149 iterator yielding :py:class:`str` or 

150 :py:class:`~pyrocko.squirrel.selection.Selection` 

151 

152 :param segment: 

153 File-specific segment identifier (can only be used when loading from a 

154 single file). 

155 :type segment: 

156 int 

157 

158 :param format: 

159 File format identifier or ``'detect'`` for autodetection. When loading 

160 from a selection, per-file format assignation is taken from the hint in 

161 the selection and this flag is ignored. 

162 :type format: 

163 str 

164 

165 :param database: 

166 Database to use for meta-information caching. When loading from a 

167 selection, this should be ``None`` and the database from the selection 

168 is used. 

169 :type database: 

170 :py:class:`~pyrocko.squirrel.database.Database` 

171 

172 :param check: 

173 If ``True``, investigate modification time and file sizes of known 

174 files to debunk modified files (pessimistic mode), or ``False`` to 

175 deactivate checks (optimistic mode). 

176 :type check: 

177 bool 

178 

179 :param skip_unchanged: 

180 If ``True``, only yield index nuts for new / modified files. 

181 :type skip_unchanged: 

182 bool 

183 

184 :param content: 

185 Selection of content types to load. 

186 :type content: 

187 :py:class:`list` of :py:class:`str` 

188 

189 This generator yields :py:class:`~pyrocko.squirrel.model.Nut` objects for 

190 individual pieces of information found when reading the given files. Such a 

191 nut may represent a waveform, a station, a channel, an event or other data 

192 type. The nut itself only contains the meta-information. The actual content 

193 information is attached to the nut if requested. All nut meta-information 

194 is stored in the squirrel meta-information database. If possible, this 

195 function avoids accessing the actual disk files and provides the requested 

196 information straight from the database. Modified files are recognized and 

197 reindexed as needed. 

198 ''' 

199 

200 from ..selection import Selection 

201 

202 n_db = 0 

203 n_load = 0 

204 selection = None 

205 kind_ids = to_kind_ids(content) 

206 

207 if isinstance(paths, (str, newstr)): 

208 paths = [paths] 

209 else: 

210 if segment is not None: 

211 raise TypeError( 

212 'iload: segment argument can only be used when loading from ' 

213 'a single file') 

214 

215 if isinstance(paths, Selection): 

216 selection = paths 

217 if database is not None: 

218 raise TypeError( 

219 'iload: database argument must be None when called with a ' 

220 'selection') 

221 

222 database = selection.get_database() 

223 

224 if skip_unchanged and not isinstance(paths, Selection): 

225 raise TypeError( 

226 'iload: need selection when called with "skip_unchanged=True"') 

227 

228 temp_selection = None 

229 transaction = None 

230 if database: 

231 if not selection: 

232 # Avoid creating temporary selection for small batches. 

233 # this is helpful because then, we can avoid locking the database, 

234 # e.g. during loading of content, when the content has not been 

235 # modified. 

236 paths = util.short_to_list(100, paths) 

237 if not (isinstance(paths, list) and len(paths) < 100 

238 and not skip_unchanged): 

239 

240 temp_selection = database.new_selection( 

241 paths, show_progress=show_progress, format=format) 

242 

243 selection = temp_selection 

244 

245 if skip_unchanged: 

246 selection.flag_modified(check) 

247 

248 if selection: 

249 # undig_grouped starts a long select which causes deadlocks 

250 # when transaction is started after starting the select, therefore 

251 # the transaction has to be started before in these cases. 

252 # The db will be locked for a long time in this case. This could be 

253 # solved either by breaking the indexing into smaller blocks in 

254 # the caller or by modifying undig_grouped to allow limit and 

255 # offset and add an outer loop below. 

256 transaction = database.transaction( 

257 'update content index') 

258 transaction.begin() 

259 it = selection.undig_grouped(skip_unchanged=skip_unchanged) 

260 else: 

261 # The list() causes the query to finish, so we don't have to lock, 

262 # and can start a transaction only when encountering a modified/new 

263 # file. 

264 it = list(database.undig_few(paths, format=format)) 

265 

266 else: 

267 it = (((format, path), []) for path in paths) 

268 

269 try: 

270 n_files_total = len(it) 

271 except TypeError: 

272 n_files_total = None 

273 

274 task = None 

275 if show_progress: 

276 if not kind_ids: 

277 task = make_task('Indexing files', n_files_total) 

278 else: 

279 task = make_task('Loading files', n_files_total) 

280 

281 n_files = 0 

282 tcommit = time.time() 

283 

284 clean = False 

285 try: 

286 for (format, path), old_nuts in it: 

287 if task is not None: 

288 condition = '(nuts: %i from file, %i from cache)\n %s' % ( 

289 n_load, n_db, path) 

290 task.update(n_files, condition) 

291 

292 n_files += 1 

293 if database and transaction: 

294 tnow = time.time() 

295 if tnow - tcommit > 20. or n_files % 1000 == 0: 

296 transaction.commit() 

297 tcommit = tnow 

298 transaction.begin() 

299 

300 try: 

301 if check and old_nuts and old_nuts[0].file_modified(): 

302 old_nuts = [] 

303 modified = True 

304 else: 

305 modified = False 

306 

307 if segment is not None: 

308 old_nuts = [ 

309 nut for nut in old_nuts if nut.file_segment == segment] 

310 

311 if old_nuts: 

312 db_only_operation = not kind_ids or all( 

313 nut.kind_id in kind_ids and nut.content_in_db 

314 for nut in old_nuts) 

315 

316 if db_only_operation: 

317 # logger.debug('using cached information for file %s, ' 

318 # % path) 

319 

320 for nut in old_nuts: 

321 if nut.kind_id in kind_ids: 

322 database.undig_content(nut) 

323 

324 n_db += 1 

325 yield nut 

326 

327 continue 

328 

329 if format == 'detect': 

330 if old_nuts and not old_nuts[0].file_modified(): 

331 format_this = old_nuts[0].file_format 

332 else: 

333 format_this = detect_format(path) 

334 else: 

335 format_this = format 

336 

337 mod = get_backend(format_this) 

338 mtime, size = mod.get_stats(path) 

339 

340 if segment is not None: 

341 logger.debug( 

342 'Reading file "%s", segment "%s".' % (path, segment)) 

343 else: 

344 logger.debug( 

345 'Reading file "%s".' % path) 

346 

347 nuts = [] 

348 for nut in mod.iload(format_this, path, segment, content): 

349 nut.file_path = path 

350 nut.file_format = format_this 

351 nut.file_mtime = mtime 

352 nut.file_size = size 

353 if nut.content is not None: 

354 nut.content._squirrel_key = nut.key 

355 

356 nuts.append(nut) 

357 n_load += 1 

358 yield nut 

359 

360 if database and nuts != old_nuts: 

361 if old_nuts or modified: 

362 logger.debug( 

363 'File has been modified since last access: %s' 

364 % path) 

365 

366 if segment is not None: 

367 nuts = list(mod.iload(format_this, path, None, [])) 

368 for nut in nuts: 

369 nut.file_path = path 

370 nut.file_format = format_this 

371 nut.file_mtime = mtime 

372 nut.file_size = size 

373 

374 if not transaction: 

375 transaction = database.transaction( 

376 'update content index') 

377 transaction.begin() 

378 

379 database.dig(nuts, transaction=transaction) 

380 if update_selection is not None: 

381 update_selection._set_file_states_force_check( 

382 [path], transaction=transaction) 

383 update_selection._update_nuts(transaction=transaction) 

384 

385 except FileLoadError: 

386 logger.error('Cannot read file: %s' % path) 

387 if database: 

388 if not transaction: 

389 transaction = database.transaction( 

390 'update content index') 

391 transaction.begin() 

392 database.reset(path, transaction=transaction) 

393 

394 clean = True 

395 

396 finally: 

397 if task is not None: 

398 condition = '(nuts: %i from file, %i from cache)' % (n_load, n_db) 

399 task.update(n_files, condition) 

400 if clean: 

401 task.done(condition) 

402 else: 

403 task.fail(condition + ' terminated') 

404 

405 if database and transaction: 

406 transaction.commit() 

407 transaction.close() 

408 

409 if temp_selection: 

410 del temp_selection 

411 

412 

413__all__ = [ 

414 'iload', 

415 'detect_format', 

416 'supported_formats', 

417 'supported_content_kinds', 

418 'get_backend', 

419 'FormatDetectionFailed', 

420 'UnknownFormat', 

421]