1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os 

7import signal 

8import errno 

9from os.path import join as pjoin 

10import numpy as num 

11 

12from collections import defaultdict 

13from pyrocko.parimap import parimap 

14from pyrocko import util 

15from . import store 

16 

17 

18def int_arr(*args): 

19 return num.array(args, dtype=int) 

20 

21 

22class Interrupted(store.StoreError): 

23 def __str__(self): 

24 return 'Interrupted.' 

25 

26 

27g_builders = {} 

28 

29 

30def work_block(args): 

31 # previously this was a implemented as a classmethod __work_block but it 

32 # caused problems on Conda Python 3.8 on OSX. 

33 try: 

34 cls, store_dir, step, iblock, shared, force = args 

35 if (store_dir, step) not in g_builders: 

36 g_builders[store_dir, step] = cls( 

37 store_dir, step, shared, force=force) 

38 

39 builder = g_builders[store_dir, step] 

40 builder.work_block(iblock) 

41 except KeyboardInterrupt: 

42 raise Interrupted() 

43 except IOError as e: 

44 if e.errno == errno.EINTR: 

45 raise Interrupted() 

46 else: 

47 raise 

48 

49 return store_dir, step, iblock 

50 

51 

52def cleanup(): 

53 for k in list(g_builders): 

54 g_builders[k].cleanup() 

55 del g_builders[k] 

56 

57 

58class Builder(object): 

59 nsteps = 1 

60 

61 def __init__(self, gf_config, step, block_size=None, force=False): 

62 if block_size is None: 

63 if len(gf_config.ns) == 3: 

64 block_size = (10, 1, 10) 

65 elif len(gf_config.ns) == 2: 

66 block_size = (1, 10) 

67 else: 

68 assert False 

69 

70 self.step = step 

71 self.force = force 

72 self.gf_config = gf_config 

73 self.warnings = defaultdict(int) 

74 self._block_size = int_arr(*block_size) 

75 

76 def cleanup(self): 

77 pass 

78 

79 @property 

80 def nblocks(self): 

81 return num.prod(self.block_dims) 

82 

83 @property 

84 def block_dims(self): 

85 return (self.gf_config.ns-1) // self._block_size + 1 

86 

87 def warn(self, msg): 

88 self.warnings[msg] += 1 

89 

90 def log_warnings(self, index, logger): 

91 for warning, noccur in self.warnings.items(): 

92 msg = 'block {}: ' + warning 

93 logger.warning(msg.format(index, noccur)) 

94 

95 self.warnings = defaultdict(int) 

96 

97 def all_block_indices(self): 

98 return num.arange(self.nblocks) 

99 

100 def get_block(self, index): 

101 dims = self.block_dims 

102 iblock = num.unravel_index(index, dims) 

103 ibegins = iblock * self._block_size 

104 iends = num.minimum(ibegins + self._block_size, self.gf_config.ns) 

105 return ibegins, iends 

106 

107 def get_block_extents(self, index): 

108 ibegins, iends = self.get_block(index) 

109 begins = self.gf_config.mins + ibegins * self.gf_config.deltas 

110 ends = self.gf_config.mins + (iends-1) * self.gf_config.deltas 

111 return begins, ends, iends - ibegins 

112 

113 @classmethod 

114 def build(cls, store_dir, force=False, nworkers=None, continue_=False, 

115 step=None, iblock=None): 

116 if step is None: 

117 steps = list(range(cls.nsteps)) 

118 else: 

119 steps = [step] 

120 

121 if iblock is not None and step is None and cls.nsteps != 1: 

122 raise store.StoreError('--step option must be given') 

123 

124 done = set() 

125 status_fn = pjoin(store_dir, '.status') 

126 

127 if not continue_ and iblock in (None, -1) and step in (None, 0): 

128 store.Store.create_dependants(store_dir, force) 

129 

130 if iblock is None: 

131 if not continue_: 

132 with open(status_fn, 'w') as status: 

133 pass 

134 else: 

135 if iblock is None: 

136 try: 

137 with open(status_fn, 'r') as status: 

138 for line in status: 

139 done.add(tuple(int(x) for x in line.split())) 

140 except IOError: 

141 raise store.StoreError('nothing to continue') 

142 

143 shared = {} 

144 for step in steps: 

145 builder = cls(store_dir, step, shared, force=force) 

146 if not (0 <= step < builder.nsteps): 

147 raise store.StoreError('invalid step: %i' % (step+1)) 

148 

149 if iblock in (None, -1): 

150 iblocks = [x for x in builder.all_block_indices() 

151 if (step, x) not in done] 

152 else: 

153 if not (0 <= iblock < builder.nblocks): 

154 raise store.StoreError( 

155 'invalid block index %i' % (iblock+1)) 

156 

157 iblocks = [iblock] 

158 

159 if iblock == -1: 

160 for i in iblocks: 

161 c = ['fomosto', 'build'] 

162 if not os.path.samefile(store_dir, '.'): 

163 c.append("'%s'" % store_dir) 

164 

165 if builder.nsteps != 1: 

166 c.append('--step=%i' % (step+1)) 

167 

168 c.append('--block=%i' % (i+1)) 

169 

170 print(' '.join(c)) 

171 

172 return 

173 

174 builder.cleanup() 

175 del builder 

176 

177 original = signal.signal(signal.SIGINT, signal.SIG_IGN) 

178 try: 

179 for x in parimap( 

180 work_block, 

181 [(cls, store_dir, step, i, shared, force) 

182 for i in iblocks], 

183 nprocs=nworkers, 

184 eprintignore=(Interrupted, store.StoreError), 

185 startup=util.setup_logging, 

186 startup_args=util.subprocess_setup_logging_args(), 

187 cleanup=cleanup): 

188 

189 store_dir, step, i = x 

190 with open(status_fn, 'a') as status: 

191 status.write('%i %i\n' % (step, i)) 

192 

193 finally: 

194 signal.signal(signal.SIGINT, original) 

195 

196 os.remove(status_fn) 

197 

198 

199__all__ = ['Builder']