Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/yaff.py: 95%

78 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-03-07 11:54 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5''' 

6File IO module for SICK traces format. 

7''' 

8 

9import os 

10from struct import unpack 

11 

12from pyrocko.file import (File, numtype2type, NoDataAvailable, 

13 size_record_header, FileError) 

14from pyrocko import trace 

15from pyrocko.util import ensuredirs 

16from .io_common import FileLoadError, FileSaveError 

17 

18record_formats = { 

19 

20 'trace': { 

21 'network': 'string', 

22 'station': 'string', 

23 'location': 'string', 

24 'channel': 'string', 

25 'tmin': 'time_string', 

26 'tmax': 'time_string', 

27 'deltat': 'f8', 

28 'ydata': ('@i2', '@i4', '@i8', '@i2', '@i4', '@i8', '@f4', '@f8'), 

29 }, 

30} 

31 

32 

33def extract(tr, format): 

34 d = {} 

35 for k in format.keys(): 

36 d[k] = getattr(tr, k) 

37 return d 

38 

39 

40class TracesFileIO(File): 

41 

42 def __init__(self, file): 

43 File.__init__( 

44 self, file, 

45 type_label='YAFF', 

46 version='0000', 

47 record_formats=record_formats) 

48 

49 def get_type(self, key, value): 

50 return numtype2type[value.dtype.type] 

51 

52 def from_dict(self, d): 

53 return trace.Trace(**d) 

54 

55 def to_dict(self, tr): 

56 return extract(tr, record_formats['trace']) 

57 

58 def load(self, load_data=True): 

59 while True: 

60 try: 

61 r = None 

62 r = self.next_record() 

63 

64 if r.type == 'trace': 

65 exclude = None 

66 if not load_data: 

67 exclude = ('ydata',) 

68 

69 d = r.unpack(exclude=exclude) 

70 tr = self.from_dict(d) 

71 yield tr 

72 

73 except NoDataAvailable: 

74 break 

75 

76 def save(self, traces): 

77 for tr in traces: 

78 r = self.add_record('trace', make_hash=True) 

79 r.pack(self.to_dict(tr)) 

80 r.close() 

81 

82 

83def iload(filename, load_data=True): 

84 try: 

85 f = open(filename, 'rb') 

86 tf = TracesFileIO(f) 

87 for tr in tf.load(load_data=load_data): 

88 yield tr 

89 

90 except (OSError, FileError) as e: 

91 raise FileLoadError(e) 

92 

93 finally: 

94 tf.close() 

95 f.close() 

96 

97 

98def save(traces, filename_template, additional={}, max_open_files=10, 

99 overwrite=True): 

100 

101 fns = set() 

102 open_files = {} 

103 

104 def close_files(): 

105 while open_files: 

106 open_files.popitem()[1].close() 

107 

108 for tr in traces: 

109 fn = tr.fill_template(filename_template, **additional) 

110 

111 if fn not in open_files: 

112 if len(open_files) >= max_open_files: 

113 close_files() 

114 

115 if fn not in fns: 

116 if not overwrite and os.path.exists(fn): 

117 raise FileSaveError('file exists: %s' % fn) 

118 

119 ensuredirs(fn) 

120 

121 open_files[fn] = open(fn, ['wb', 'ab'][fn in fns]) 

122 fns.add(fn) 

123 

124 tf = TracesFileIO(open_files[fn]) 

125 tf.save([tr]) 

126 tf.close() 

127 

128 close_files() 

129 

130 return list(fns) 

131 

132 

133def detect(first512): 

134 

135 if len(first512) < size_record_header: 

136 return False 

137 

138 label, version, size_record, size_payload, hash, type = unpack( 

139 '>4s4sQQ20s20s', first512[:size_record_header]) 

140 

141 if label == b'YAFF' and version == b'0000' and type.strip() == b'trace': 

142 return True 

143 

144 return False