1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import os
7import signal
8import errno
9from os.path import join as pjoin
10import numpy as num
12from collections import defaultdict
13from pyrocko.parimap import parimap
14from pyrocko import util
15from . import store
18def int_arr(*args):
19 return num.array(args, dtype=int)
22class Interrupted(store.StoreError):
23 def __str__(self):
24 return 'Interrupted.'
27g_builders = {}
30def work_block(args):
31 # previously this was a implemented as a classmethod __work_block but it
32 # caused problems on Conda Python 3.8 on OSX.
33 try:
34 cls, store_dir, step, iblock, shared, force = args
35 if (store_dir, step) not in g_builders:
36 g_builders[store_dir, step] = cls(
37 store_dir, step, shared, force=force)
39 builder = g_builders[store_dir, step]
40 builder.work_block(iblock)
41 except KeyboardInterrupt:
42 raise Interrupted()
43 except IOError as e:
44 if e.errno == errno.EINTR:
45 raise Interrupted()
46 else:
47 raise
49 return store_dir, step, iblock
52def cleanup():
53 for k in list(g_builders):
54 g_builders[k].cleanup()
55 del g_builders[k]
58class Builder(object):
59 nsteps = 1
61 def __init__(self, gf_config, step, block_size=None, force=False):
62 if block_size is None:
63 if len(gf_config.ns) == 3:
64 block_size = (10, 1, 10)
65 elif len(gf_config.ns) == 2:
66 block_size = (1, 10)
67 else:
68 assert False
70 self.step = step
71 self.force = force
72 self.gf_config = gf_config
73 self.warnings = defaultdict(int)
74 self._block_size = int_arr(*block_size)
76 def cleanup(self):
77 pass
79 @property
80 def nblocks(self):
81 return num.prod(self.block_dims)
83 @property
84 def block_dims(self):
85 return (self.gf_config.ns-1) // self._block_size + 1
87 def warn(self, msg):
88 self.warnings[msg] += 1
90 def log_warnings(self, index, logger):
91 for warning, noccur in self.warnings.items():
92 msg = 'block {}: ' + warning
93 logger.warning(msg.format(index, noccur))
95 self.warnings = defaultdict(int)
97 def all_block_indices(self):
98 return num.arange(self.nblocks)
100 def get_block(self, index):
101 dims = self.block_dims
102 iblock = num.unravel_index(index, dims)
103 ibegins = iblock * self._block_size
104 iends = num.minimum(ibegins + self._block_size, self.gf_config.ns)
105 return ibegins, iends
107 def get_block_extents(self, index):
108 ibegins, iends = self.get_block(index)
109 begins = self.gf_config.mins + ibegins * self.gf_config.deltas
110 ends = self.gf_config.mins + (iends-1) * self.gf_config.deltas
111 return begins, ends, iends - ibegins
113 @classmethod
114 def build(cls, store_dir, force=False, nworkers=None, continue_=False,
115 step=None, iblock=None):
116 if step is None:
117 steps = list(range(cls.nsteps))
118 else:
119 steps = [step]
121 if iblock is not None and step is None and cls.nsteps != 1:
122 raise store.StoreError('--step option must be given')
124 done = set()
125 status_fn = pjoin(store_dir, '.status')
127 if not continue_ and iblock in (None, -1) and step in (None, 0):
128 store.Store.create_dependants(store_dir, force)
130 if iblock is None:
131 if not continue_:
132 with open(status_fn, 'w') as status:
133 pass
134 else:
135 if iblock is None:
136 try:
137 with open(status_fn, 'r') as status:
138 for line in status:
139 done.add(tuple(int(x) for x in line.split()))
140 except IOError:
141 raise store.StoreError('nothing to continue')
143 shared = {}
144 for step in steps:
145 builder = cls(store_dir, step, shared, force=force)
146 if not (0 <= step < builder.nsteps):
147 raise store.StoreError('invalid step: %i' % (step+1))
149 if iblock in (None, -1):
150 iblocks = [x for x in builder.all_block_indices()
151 if (step, x) not in done]
152 else:
153 if not (0 <= iblock < builder.nblocks):
154 raise store.StoreError(
155 'invalid block index %i' % (iblock+1))
157 iblocks = [iblock]
159 if iblock == -1:
160 for i in iblocks:
161 c = ['fomosto', 'build']
162 if not os.path.samefile(store_dir, '.'):
163 c.append("'%s'" % store_dir)
165 if builder.nsteps != 1:
166 c.append('--step=%i' % (step+1))
168 c.append('--block=%i' % (i+1))
170 print(' '.join(c))
172 return
174 builder.cleanup()
175 del builder
177 original = signal.signal(signal.SIGINT, signal.SIG_IGN)
178 try:
179 for x in parimap(
180 work_block,
181 [(cls, store_dir, step, i, shared, force)
182 for i in iblocks],
183 nprocs=nworkers,
184 eprintignore=(Interrupted, store.StoreError),
185 startup=util.setup_logging,
186 startup_args=util.subprocess_setup_logging_args(),
187 cleanup=cleanup):
189 store_dir, step, i = x
190 with open(status_fn, 'a') as status:
191 status.write('%i %i\n' % (step, i))
193 finally:
194 signal.signal(signal.SIGINT, original)
196 os.remove(status_fn)
199__all__ = ['Builder']