Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gf/builder.py: 75%

120 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Common base for Green's function store builders (:py:mod:`~pyrocko.fomosto`). 

8''' 

9 

10import os 

11import signal 

12import errno 

13from os.path import join as pjoin 

14import numpy as num 

15 

16from collections import defaultdict 

17from pyrocko.parimap import parimap 

18from pyrocko import util 

19from . import store 

20 

21 

22def int_arr(*args): 

23 return num.array(args, dtype=int) 

24 

25 

26class Interrupted(store.StoreError): 

27 def __str__(self): 

28 return 'Interrupted.' 

29 

30 

31g_builders = {} 

32 

33 

34def work_block(args): 

35 # previously this was a implemented as a classmethod __work_block but it 

36 # caused problems on Conda Python 3.8 on OSX. 

37 try: 

38 cls, store_dir, step, iblock, shared, force = args 

39 if (store_dir, step) not in g_builders: 

40 g_builders[store_dir, step] = cls( 

41 store_dir, step, shared, force=force) 

42 

43 builder = g_builders[store_dir, step] 

44 builder.work_block(iblock) 

45 except KeyboardInterrupt: 

46 raise Interrupted() 

47 except IOError as e: 

48 if e.errno == errno.EINTR: 

49 raise Interrupted() 

50 else: 

51 raise 

52 

53 return store_dir, step, iblock 

54 

55 

56def cleanup(): 

57 for k in list(g_builders): 

58 g_builders[k].cleanup() 

59 del g_builders[k] 

60 

61 

62class Builder(object): 

63 nsteps = 1 

64 

65 def __init__(self, gf_config, step, block_size=None, force=False): 

66 if block_size is None: 

67 if len(gf_config.ns) == 3: 

68 block_size = (10, 1, 10) 

69 elif len(gf_config.ns) == 2: 

70 block_size = (1, 10) 

71 else: 

72 assert False 

73 

74 self.step = step 

75 self.force = force 

76 self.gf_config = gf_config 

77 self.warnings = defaultdict(int) 

78 self._block_size = int_arr(*block_size) 

79 

80 def cleanup(self): 

81 pass 

82 

83 @property 

84 def nblocks(self): 

85 return num.prod(self.block_dims) 

86 

87 @property 

88 def block_dims(self): 

89 return (self.gf_config.ns-1) // self._block_size + 1 

90 

91 def warn(self, msg): 

92 self.warnings[msg] += 1 

93 

94 def log_warnings(self, index, logger): 

95 for warning, noccur in self.warnings.items(): 

96 msg = 'block {}: ' + warning 

97 logger.warning(msg.format(index, noccur)) 

98 

99 self.warnings = defaultdict(int) 

100 

101 def all_block_indices(self): 

102 return num.arange(self.nblocks) 

103 

104 def get_block(self, index): 

105 dims = self.block_dims 

106 iblock = num.unravel_index(index, dims) 

107 ibegins = iblock * self._block_size 

108 iends = num.minimum(ibegins + self._block_size, self.gf_config.ns) 

109 return ibegins, iends 

110 

111 def get_block_extents(self, index): 

112 ibegins, iends = self.get_block(index) 

113 begins = self.gf_config.mins + ibegins * self.gf_config.deltas 

114 ends = self.gf_config.mins + (iends-1) * self.gf_config.deltas 

115 return begins, ends, iends - ibegins 

116 

117 @classmethod 

118 def build(cls, store_dir, force=False, nworkers=None, continue_=False, 

119 step=None, iblock=None): 

120 if step is None: 

121 steps = list(range(cls.nsteps)) 

122 else: 

123 steps = [step] 

124 

125 if iblock is not None and step is None and cls.nsteps != 1: 

126 raise store.StoreError('--step option must be given') 

127 

128 done = set() 

129 status_fn = pjoin(store_dir, '.status') 

130 

131 if not continue_ and iblock in (None, -1) and step in (None, 0): 

132 store.Store.create_dependants(store_dir, force) 

133 

134 if iblock is None: 

135 if not continue_: 

136 with open(status_fn, 'w') as status: 

137 pass 

138 else: 

139 if iblock is None: 

140 try: 

141 with open(status_fn, 'r') as status: 

142 for line in status: 

143 done.add(tuple(int(x) for x in line.split())) 

144 except IOError: 

145 raise store.StoreError('nothing to continue') 

146 

147 shared = {} 

148 for step in steps: 

149 builder = cls(store_dir, step, shared, force=force) 

150 if not (0 <= step < builder.nsteps): 

151 raise store.StoreError('invalid step: %i' % (step+1)) 

152 

153 if iblock in (None, -1): 

154 iblocks = [x for x in builder.all_block_indices() 

155 if (step, x) not in done] 

156 else: 

157 if not (0 <= iblock < builder.nblocks): 

158 raise store.StoreError( 

159 'invalid block index %i' % (iblock+1)) 

160 

161 iblocks = [iblock] 

162 

163 if iblock == -1: 

164 for i in iblocks: 

165 c = ['fomosto', 'build'] 

166 if not os.path.samefile(store_dir, '.'): 

167 c.append("'%s'" % store_dir) 

168 

169 if builder.nsteps != 1: 

170 c.append('--step=%i' % (step+1)) 

171 

172 c.append('--block=%i' % (i+1)) 

173 

174 print(' '.join(c)) 

175 

176 return 

177 

178 builder.cleanup() 

179 del builder 

180 

181 original = signal.signal(signal.SIGINT, signal.SIG_IGN) 

182 try: 

183 for x in parimap( 

184 work_block, 

185 [(cls, store_dir, step, i, shared, force) 

186 for i in iblocks], 

187 nprocs=nworkers, 

188 eprintignore=(Interrupted, store.StoreError), 

189 startup=util.setup_logging, 

190 startup_args=util.subprocess_setup_logging_args(), 

191 cleanup=cleanup): 

192 

193 store_dir, step, i = x 

194 with open(status_fn, 'a') as status: 

195 status.write('%i %i\n' % (step, i)) 

196 

197 finally: 

198 signal.signal(signal.SIGINT, original) 

199 

200 os.remove(status_fn) 

201 

202 

203__all__ = ['Builder']