1import numpy as num 

2import logging 

3 

4from pyrocko import trace 

5from pyrocko.guts import (Object, Dict, String, Float, Bool, Int) 

6 

7logger = logging.getLogger('grond.synthetic_tests') 

8 

9guts_prefix = 'grond' 

10 

11 

12class SyntheticWaveformNotAvailable(Exception): 

13 pass 

14 

15 

16class SyntheticTest(Object): 

17 inject_solution = Bool.T(default=False) 

18 respect_data_availability = Bool.T(default=False) 

19 real_noise_scale = Float.T(default=0.0) 

20 white_noise_scale = Float.T(default=0.0) 

21 relative_white_noise_scale = Float.T(default=0.0) 

22 random_response_scale = Float.T(default=0.0) 

23 real_noise_toffset = Float.T(default=-3600.) 

24 random_seed = Int.T(optional=True) 

25 x = Dict.T(String.T(), Float.T()) 

26 

27 def __init__(self, **kwargs): 

28 Object.__init__(self, **kwargs) 

29 self._problem = None 

30 self._synthetics = None 

31 

32 def set_problem(self, problem): 

33 self._problem = problem 

34 self._synthetics = None 

35 

36 def get_problem(self): 

37 if self._problem is None: 

38 raise SyntheticWaveformNotAvailable( 

39 'SyntheticTest.set_problem() has not been called yet') 

40 

41 return self._problem 

42 

43 def get_x(self): 

44 problem = self.get_problem() 

45 if self.x: 

46 x = problem.preconstrain( 

47 problem.get_parameter_array(self.x)) 

48 

49 else: 

50 x = problem.preconstrain( 

51 problem.pack( 

52 problem.base_source)) 

53 

54 return x 

55 

56 def get_synthetics(self): 

57 problem = self.get_problem() 

58 if self._synthetics is None: 

59 x = self.get_x() 

60 results = problem.forward(x) 

61 synthetics = {} 

62 for iresult, result in enumerate(results): 

63 tr = result.trace.pyrocko_trace() 

64 tfade = tr.tmax - tr.tmin 

65 tr_orig = tr.copy() 

66 tr.extend(tr.tmin - tfade, tr.tmax + tfade) 

67 rstate = num.random.RandomState( 

68 (self.random_seed or 0) + iresult) 

69 

70 if self.random_response_scale != 0: 

71 tf = RandomResponse(scale=self.random_response_scale) 

72 tf.set_random_state(rstate) 

73 tr = tr.transfer( 

74 tfade=tfade, 

75 transfer_function=tf) 

76 

77 if self.white_noise_scale != 0.0: 

78 u = rstate.normal( 

79 scale=self.white_noise_scale, 

80 size=tr.data_len()) 

81 

82 tr.ydata += u 

83 

84 if self.relative_white_noise_scale != 0.0: 

85 u = rstate.normal( 

86 scale=self.relative_white_noise_scale * num.std( 

87 tr_orig.ydata), 

88 size=tr.data_len()) 

89 

90 tr.ydata += u 

91 

92 synthetics[result.trace.codes] = tr 

93 

94 self._synthetics = synthetics 

95 

96 return self._synthetics 

97 

98 def get_waveform(self, nslc, tmin, tmax, tfade=0., freqlimits=None): 

99 synthetics = self.get_synthetics() 

100 if nslc not in synthetics: 

101 s = 'no synthetics for %s available' % '.'.join(nslc) 

102 logger.warning(s) 

103 from grond.dataset import NotFound 

104 raise NotFound(s) 

105 

106 tr = synthetics[nslc] 

107 tr.extend(tmin - tfade * 2.0, tmax + tfade * 2.0) 

108 

109 tr = tr.transfer( 

110 tfade=tfade, 

111 freqlimits=freqlimits) 

112 

113 tr.chop(tmin, tmax) 

114 return tr 

115 

116 

117class RandomResponse(trace.FrequencyResponse): 

118 

119 scale = Float.T(default=0.0) 

120 

121 def set_random_state(self, rstate): 

122 self._rstate = rstate 

123 

124 def evaluate(self, freqs): 

125 n = freqs.size 

126 return 1.0 + ( 

127 self._rstate.normal(scale=self.scale, size=n) + 

128 0.0J * self._rstate.normal(scale=self.scale, size=n)) 

129 

130 

131__all__ = ''' 

132SyntheticTest 

133SyntheticWaveformNotAvailable 

134'''.split()