1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division
7import os
8import signal
9import errno
10from os.path import join as pjoin
11import numpy as num
13from collections import defaultdict
14from pyrocko.parimap import parimap
15from pyrocko import util
16from . import store
19def int_arr(*args):
20 return num.array(args, dtype=int)
23class Interrupted(store.StoreError):
24 def __str__(self):
25 return 'Interrupted.'
28g_builders = {}
31def work_block(args):
32 # previously this was a implemented as a classmethod __work_block but it
33 # caused problems on Conda Python 3.8 on OSX.
34 try:
35 cls, store_dir, step, iblock, shared, force = args
36 if (store_dir, step) not in g_builders:
37 g_builders[store_dir, step] = cls(
38 store_dir, step, shared, force=force)
40 builder = g_builders[store_dir, step]
41 builder.work_block(iblock)
42 except KeyboardInterrupt:
43 raise Interrupted()
44 except IOError as e:
45 if e.errno == errno.EINTR:
46 raise Interrupted()
47 else:
48 raise
50 return store_dir, step, iblock
53def cleanup():
54 for k in list(g_builders):
55 g_builders[k].cleanup()
56 del g_builders[k]
59class Builder(object):
60 nsteps = 1
62 def __init__(self, gf_config, step, block_size=None, force=False):
63 if block_size is None:
64 if len(gf_config.ns) == 3:
65 block_size = (10, 1, 10)
66 elif len(gf_config.ns) == 2:
67 block_size = (1, 10)
68 else:
69 assert False
71 self.step = step
72 self.force = force
73 self.gf_config = gf_config
74 self.warnings = defaultdict(int)
75 self._block_size = int_arr(*block_size)
77 def cleanup(self):
78 pass
80 @property
81 def nblocks(self):
82 return num.prod(self.block_dims)
84 @property
85 def block_dims(self):
86 return (self.gf_config.ns-1) // self._block_size + 1
88 def warn(self, msg):
89 self.warnings[msg] += 1
91 def log_warnings(self, index, logger):
92 for warning, noccur in self.warnings.items():
93 msg = "block {}: " + warning
94 logger.warn(msg.format(index, noccur))
96 self.warnings = defaultdict(int)
98 def all_block_indices(self):
99 return num.arange(self.nblocks)
101 def get_block(self, index):
102 dims = self.block_dims
103 iblock = num.unravel_index(index, dims)
104 ibegins = iblock * self._block_size
105 iends = num.minimum(ibegins + self._block_size, self.gf_config.ns)
106 return ibegins, iends
108 def get_block_extents(self, index):
109 ibegins, iends = self.get_block(index)
110 begins = self.gf_config.mins + ibegins * self.gf_config.deltas
111 ends = self.gf_config.mins + (iends-1) * self.gf_config.deltas
112 return begins, ends, iends - ibegins
114 @classmethod
115 def build(cls, store_dir, force=False, nworkers=None, continue_=False,
116 step=None, iblock=None):
117 if step is None:
118 steps = list(range(cls.nsteps))
119 else:
120 steps = [step]
122 if iblock is not None and step is None and cls.nsteps != 1:
123 raise store.StoreError('--step option must be given')
125 done = set()
126 status_fn = pjoin(store_dir, '.status')
128 if not continue_ and iblock in (None, -1) and step in (None, 0):
129 store.Store.create_dependants(store_dir, force)
131 if iblock is None:
132 if not continue_:
133 with open(status_fn, 'w') as status:
134 pass
135 else:
136 if iblock is None:
137 try:
138 with open(status_fn, 'r') as status:
139 for line in status:
140 done.add(tuple(int(x) for x in line.split()))
141 except IOError:
142 raise store.StoreError('nothing to continue')
144 shared = {}
145 for step in steps:
146 builder = cls(store_dir, step, shared, force=force)
147 if not (0 <= step < builder.nsteps):
148 raise store.StoreError('invalid step: %i' % (step+1))
150 if iblock in (None, -1):
151 iblocks = [x for x in builder.all_block_indices()
152 if (step, x) not in done]
153 else:
154 if not (0 <= iblock < builder.nblocks):
155 raise store.StoreError(
156 'invalid block index %i' % (iblock+1))
158 iblocks = [iblock]
160 if iblock == -1:
161 for i in iblocks:
162 c = ['fomosto', 'build']
163 if not os.path.samefile(store_dir, '.'):
164 c.append("'%s'" % store_dir)
166 if builder.nsteps != 1:
167 c.append('--step=%i' % (step+1))
169 c.append('--block=%i' % (i+1))
171 print(' '.join(c))
173 return
175 builder.cleanup()
176 del builder
178 original = signal.signal(signal.SIGINT, signal.SIG_IGN)
179 try:
180 for x in parimap(
181 work_block,
182 [(cls, store_dir, step, i, shared, force)
183 for i in iblocks],
184 nprocs=nworkers,
185 eprintignore=(Interrupted, store.StoreError),
186 startup=util.setup_logging,
187 startup_args=util.subprocess_setup_logging_args(),
188 cleanup=cleanup):
190 store_dir, step, i = x
191 with open(status_fn, 'a') as status:
192 status.write('%i %i\n' % (step, i))
194 finally:
195 signal.signal(signal.SIGINT, original)
197 os.remove(status_fn)
200__all__ = ['Builder']