Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/parimap.py: 78%

123 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Parallel :py:func:`map` implementation based on :py:mod:`multiprocessing`. 

8''' 

9 

10try: 

11 import queue 

12except ImportError: 

13 import Queue as queue 

14 

15 

16import logging 

17import multiprocessing 

18import traceback 

19import errno 

20 

21 

22logger = logging.getLogger('pyrocko.parimap') 

23 

24 

25def worker( 

26 q_in, q_out, function, eprintignore, pshared, 

27 startup, startup_args, cleanup): 

28 

29 kwargs = {} 

30 if pshared is not None: 

31 kwargs['pshared'] = pshared 

32 

33 if startup is not None: 

34 startup(*startup_args) 

35 

36 while True: 

37 i, args = q_in.get() 

38 if i is None: 

39 if cleanup is not None: 

40 cleanup() 

41 

42 break 

43 

44 res, exception = None, None 

45 try: 

46 res = function(*args, **kwargs) 

47 except Exception as e: 

48 if eprintignore is None or not isinstance(e, eprintignore): 

49 traceback.print_exc() 

50 exception = e 

51 q_out.put((i, res, exception)) 

52 

53 

54def parimap(function, *iterables, **kwargs): 

55 assert all( 

56 k in ( 

57 'nprocs', 'eprintignore', 'pshared', 'startup', 'startup_args', 

58 'cleanup') 

59 for k in kwargs.keys()) 

60 

61 nprocs = kwargs.get('nprocs', None) 

62 eprintignore = kwargs.get('eprintignore', 'all') 

63 pshared = kwargs.get('pshared', None) 

64 startup = kwargs.get('startup', None) 

65 startup_args = kwargs.get('startup_args', ()) 

66 cleanup = kwargs.get('cleanup', None) 

67 

68 if eprintignore == 'all': 

69 eprintignore = None 

70 

71 if nprocs == 1: 

72 iterables = list(map(iter, iterables)) 

73 kwargs = {} 

74 if pshared is not None: 

75 kwargs['pshared'] = pshared 

76 

77 if startup is not None: 

78 startup(*startup_args) 

79 

80 try: 

81 while True: 

82 args = [] 

83 for it in iterables: 

84 try: 

85 args.append(next(it)) 

86 except StopIteration: 

87 return 

88 

89 yield function(*args, **kwargs) 

90 

91 finally: 

92 if cleanup is not None: 

93 cleanup() 

94 

95 return 

96 

97 if multiprocessing.get_start_method(allow_none=True) is None: 

98 multiprocessing.set_start_method('spawn') 

99 

100 if nprocs is None: 

101 nprocs = multiprocessing.cpu_count() 

102 

103 q_in = multiprocessing.Queue(1) 

104 q_out = multiprocessing.Queue() 

105 

106 procs = [] 

107 

108 results = [] 

109 nrun = 0 

110 nwritten = 0 

111 iout = 0 

112 all_written = False 

113 error_ahead = False 

114 iterables = list(map(iter, iterables)) 

115 while True: 

116 if nrun < nprocs and not all_written and not error_ahead: 

117 args = [] 

118 for it in iterables: 

119 try: 

120 args.append(next(it)) 

121 except StopIteration: 

122 pass 

123 

124 if len(args) == len(iterables): 

125 if len(procs) < nrun + 1: 

126 p = multiprocessing.Process( 

127 target=worker, 

128 args=(q_in, q_out, function, eprintignore, pshared, 

129 startup, startup_args, cleanup)) 

130 p.daemon = True 

131 p.start() 

132 procs.append(p) 

133 

134 q_in.put((nwritten, args)) 

135 nwritten += 1 

136 nrun += 1 

137 else: 

138 all_written = True 

139 [q_in.put((None, None)) for p in procs] 

140 q_in.close() 

141 

142 try: 

143 while nrun > 0: 

144 if nrun < nprocs and not all_written and not error_ahead: 

145 results.append(q_out.get_nowait()) 

146 else: 

147 while True: 

148 try: 

149 results.append(q_out.get()) 

150 break 

151 except IOError as e: 

152 if e.errno != errno.EINTR: 

153 raise 

154 

155 nrun -= 1 

156 

157 except queue.Empty: 

158 pass 

159 

160 if results: 

161 results.sort() 

162 # check for error ahead to prevent further enqueuing 

163 if any(exc for (_, _, exc) in results): 

164 error_ahead = True 

165 

166 while results: 

167 (i, r, exc) = results[0] 

168 if i == iout: 

169 results.pop(0) 

170 if exc is not None: 

171 if not all_written: 

172 [q_in.put((None, None)) for p in procs] 

173 q_in.close() 

174 raise exc 

175 else: 

176 yield r 

177 

178 iout += 1 

179 else: 

180 break 

181 

182 if all_written and nrun == 0: 

183 break 

184 

185 [p.join() for p in procs] 

186 return