1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import, division 

6 

7import os 

8import signal 

9import errno 

10from os.path import join as pjoin 

11import numpy as num 

12 

13from collections import defaultdict 

14from pyrocko.parimap import parimap 

15from pyrocko import util 

16from . import store 

17 

18 

19def int_arr(*args): 

20 return num.array(args, dtype=int) 

21 

22 

23class Interrupted(store.StoreError): 

24 def __str__(self): 

25 return 'Interrupted.' 

26 

27 

28g_builders = {} 

29 

30 

31def work_block(args): 

32 # previously this was a implemented as a classmethod __work_block but it 

33 # caused problems on Conda Python 3.8 on OSX. 

34 try: 

35 cls, store_dir, step, iblock, shared, force = args 

36 if (store_dir, step) not in g_builders: 

37 g_builders[store_dir, step] = cls( 

38 store_dir, step, shared, force=force) 

39 

40 builder = g_builders[store_dir, step] 

41 builder.work_block(iblock) 

42 except KeyboardInterrupt: 

43 raise Interrupted() 

44 except IOError as e: 

45 if e.errno == errno.EINTR: 

46 raise Interrupted() 

47 else: 

48 raise 

49 

50 return store_dir, step, iblock 

51 

52 

53def cleanup(): 

54 for k in list(g_builders): 

55 g_builders[k].cleanup() 

56 del g_builders[k] 

57 

58 

59class Builder(object): 

60 nsteps = 1 

61 

62 def __init__(self, gf_config, step, block_size=None, force=False): 

63 if block_size is None: 

64 if len(gf_config.ns) == 3: 

65 block_size = (10, 1, 10) 

66 elif len(gf_config.ns) == 2: 

67 block_size = (1, 10) 

68 else: 

69 assert False 

70 

71 self.step = step 

72 self.force = force 

73 self.gf_config = gf_config 

74 self.warnings = defaultdict(int) 

75 self._block_size = int_arr(*block_size) 

76 

77 def cleanup(self): 

78 pass 

79 

80 @property 

81 def nblocks(self): 

82 return num.prod(self.block_dims) 

83 

84 @property 

85 def block_dims(self): 

86 return (self.gf_config.ns-1) // self._block_size + 1 

87 

88 def warn(self, msg): 

89 self.warnings[msg] += 1 

90 

91 def log_warnings(self, index, logger): 

92 for warning, noccur in self.warnings.items(): 

93 msg = "block {}: " + warning 

94 logger.warn(msg.format(index, noccur)) 

95 

96 self.warnings = defaultdict(int) 

97 

98 def all_block_indices(self): 

99 return num.arange(self.nblocks) 

100 

101 def get_block(self, index): 

102 dims = self.block_dims 

103 iblock = num.unravel_index(index, dims) 

104 ibegins = iblock * self._block_size 

105 iends = num.minimum(ibegins + self._block_size, self.gf_config.ns) 

106 return ibegins, iends 

107 

108 def get_block_extents(self, index): 

109 ibegins, iends = self.get_block(index) 

110 begins = self.gf_config.mins + ibegins * self.gf_config.deltas 

111 ends = self.gf_config.mins + (iends-1) * self.gf_config.deltas 

112 return begins, ends, iends - ibegins 

113 

114 @classmethod 

115 def build(cls, store_dir, force=False, nworkers=None, continue_=False, 

116 step=None, iblock=None): 

117 if step is None: 

118 steps = list(range(cls.nsteps)) 

119 else: 

120 steps = [step] 

121 

122 if iblock is not None and step is None and cls.nsteps != 1: 

123 raise store.StoreError('--step option must be given') 

124 

125 done = set() 

126 status_fn = pjoin(store_dir, '.status') 

127 

128 if not continue_ and iblock in (None, -1) and step in (None, 0): 

129 store.Store.create_dependants(store_dir, force) 

130 

131 if iblock is None: 

132 if not continue_: 

133 with open(status_fn, 'w') as status: 

134 pass 

135 else: 

136 if iblock is None: 

137 try: 

138 with open(status_fn, 'r') as status: 

139 for line in status: 

140 done.add(tuple(int(x) for x in line.split())) 

141 except IOError: 

142 raise store.StoreError('nothing to continue') 

143 

144 shared = {} 

145 for step in steps: 

146 builder = cls(store_dir, step, shared, force=force) 

147 if not (0 <= step < builder.nsteps): 

148 raise store.StoreError('invalid step: %i' % (step+1)) 

149 

150 if iblock in (None, -1): 

151 iblocks = [x for x in builder.all_block_indices() 

152 if (step, x) not in done] 

153 else: 

154 if not (0 <= iblock < builder.nblocks): 

155 raise store.StoreError( 

156 'invalid block index %i' % (iblock+1)) 

157 

158 iblocks = [iblock] 

159 

160 if iblock == -1: 

161 for i in iblocks: 

162 c = ['fomosto', 'build'] 

163 if not os.path.samefile(store_dir, '.'): 

164 c.append("'%s'" % store_dir) 

165 

166 if builder.nsteps != 1: 

167 c.append('--step=%i' % (step+1)) 

168 

169 c.append('--block=%i' % (i+1)) 

170 

171 print(' '.join(c)) 

172 

173 return 

174 

175 builder.cleanup() 

176 del builder 

177 

178 original = signal.signal(signal.SIGINT, signal.SIG_IGN) 

179 try: 

180 for x in parimap( 

181 work_block, 

182 [(cls, store_dir, step, i, shared, force) 

183 for i in iblocks], 

184 nprocs=nworkers, 

185 eprintignore=(Interrupted, store.StoreError), 

186 startup=util.setup_logging, 

187 startup_args=util.subprocess_setup_logging_args(), 

188 cleanup=cleanup): 

189 

190 store_dir, step, i = x 

191 with open(status_fn, 'a') as status: 

192 status.write('%i %i\n' % (step, i)) 

193 

194 finally: 

195 signal.signal(signal.SIGINT, original) 

196 

197 os.remove(status_fn) 

198 

199 

200__all__ = ['Builder']