Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/parimap.py: 78%
123 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Parallel :py:func:`map` implementation based on :py:mod:`multiprocessing`.
8'''
10try:
11 import queue
12except ImportError:
13 import Queue as queue
16import logging
17import multiprocessing
18import traceback
19import errno
22logger = logging.getLogger('pyrocko.parimap')
25def worker(
26 q_in, q_out, function, eprintignore, pshared,
27 startup, startup_args, cleanup):
29 kwargs = {}
30 if pshared is not None:
31 kwargs['pshared'] = pshared
33 if startup is not None:
34 startup(*startup_args)
36 while True:
37 i, args = q_in.get()
38 if i is None:
39 if cleanup is not None:
40 cleanup()
42 break
44 res, exception = None, None
45 try:
46 res = function(*args, **kwargs)
47 except Exception as e:
48 if eprintignore is None or not isinstance(e, eprintignore):
49 traceback.print_exc()
50 exception = e
51 q_out.put((i, res, exception))
54def parimap(function, *iterables, **kwargs):
55 assert all(
56 k in (
57 'nprocs', 'eprintignore', 'pshared', 'startup', 'startup_args',
58 'cleanup')
59 for k in kwargs.keys())
61 nprocs = kwargs.get('nprocs', None)
62 eprintignore = kwargs.get('eprintignore', 'all')
63 pshared = kwargs.get('pshared', None)
64 startup = kwargs.get('startup', None)
65 startup_args = kwargs.get('startup_args', ())
66 cleanup = kwargs.get('cleanup', None)
68 if eprintignore == 'all':
69 eprintignore = None
71 if nprocs == 1:
72 iterables = list(map(iter, iterables))
73 kwargs = {}
74 if pshared is not None:
75 kwargs['pshared'] = pshared
77 if startup is not None:
78 startup(*startup_args)
80 try:
81 while True:
82 args = []
83 for it in iterables:
84 try:
85 args.append(next(it))
86 except StopIteration:
87 return
89 yield function(*args, **kwargs)
91 finally:
92 if cleanup is not None:
93 cleanup()
95 return
97 if multiprocessing.get_start_method(allow_none=True) is None:
98 multiprocessing.set_start_method('spawn')
100 if nprocs is None:
101 nprocs = multiprocessing.cpu_count()
103 q_in = multiprocessing.Queue(1)
104 q_out = multiprocessing.Queue()
106 procs = []
108 results = []
109 nrun = 0
110 nwritten = 0
111 iout = 0
112 all_written = False
113 error_ahead = False
114 iterables = list(map(iter, iterables))
115 while True:
116 if nrun < nprocs and not all_written and not error_ahead:
117 args = []
118 for it in iterables:
119 try:
120 args.append(next(it))
121 except StopIteration:
122 pass
124 if len(args) == len(iterables):
125 if len(procs) < nrun + 1:
126 p = multiprocessing.Process(
127 target=worker,
128 args=(q_in, q_out, function, eprintignore, pshared,
129 startup, startup_args, cleanup))
130 p.daemon = True
131 p.start()
132 procs.append(p)
134 q_in.put((nwritten, args))
135 nwritten += 1
136 nrun += 1
137 else:
138 all_written = True
139 [q_in.put((None, None)) for p in procs]
140 q_in.close()
142 try:
143 while nrun > 0:
144 if nrun < nprocs and not all_written and not error_ahead:
145 results.append(q_out.get_nowait())
146 else:
147 while True:
148 try:
149 results.append(q_out.get())
150 break
151 except IOError as e:
152 if e.errno != errno.EINTR:
153 raise
155 nrun -= 1
157 except queue.Empty:
158 pass
160 if results:
161 results.sort()
162 # check for error ahead to prevent further enqueuing
163 if any(exc for (_, _, exc) in results):
164 error_ahead = True
166 while results:
167 (i, r, exc) = results[0]
168 if i == iout:
169 results.pop(0)
170 if exc is not None:
171 if not all_written:
172 [q_in.put((None, None)) for p in procs]
173 q_in.close()
174 raise exc
175 else:
176 yield r
178 iout += 1
179 else:
180 break
182 if all_written and nrun == 0:
183 break
185 [p.join() for p in procs]
186 return