1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

# http://pyrocko.org - GPLv3 

# 

# The Pyrocko Developers, 21st Century 

# ---|P------/S----------~Lg---------- 

from __future__ import absolute_import 

from builtins import zip 

from builtins import str as newstr 

 

import numpy as num 

from io import BytesIO 

from base64 import b64decode, b64encode 

import binascii 

 

from .guts import TBase, Object, ValidationError, literal 

 

 

try: 

unicode 

except NameError: 

unicode = str 

 

 

restricted_dtype_map = { 

num.dtype('float64'): '<f8', 

num.dtype('float32'): '<f4', 

num.dtype('int64'): '<i8', 

num.dtype('int32'): '<i4', 

num.dtype('int16'): '<i2', 

num.dtype('int8'): '<i1'} 

 

restricted_dtype_map_rev = dict( 

(v, k) for (k, v) in restricted_dtype_map.items()) 

 

 

def array_equal(a, b): 

return a.dtype == b.dtype \ 

and a.shape == b.shape \ 

and num.all(a == b) 

 

 

class Array(Object): 

 

dummy_for = num.ndarray 

 

class __T(TBase): 

def __init__( 

self, 

shape=None, 

dtype=None, 

serialize_as='table', 

serialize_dtype=None, 

*args, **kwargs): 

 

TBase.__init__(self, *args, **kwargs) 

self.shape = shape 

self.dtype = dtype 

assert serialize_as in ( 

'table', 'base64', 'list', 'npy', 

'base64+meta', 'base64-compat') 

self.serialize_as = serialize_as 

self.serialize_dtype = serialize_dtype 

 

def is_default(self, val): 

if self._default_cmp is None: 

return val is None 

elif val is None: 

return False 

else: 

return array_equal(self._default_cmp, val) 

 

def regularize_extra(self, val): 

if isinstance(val, (str, newstr)): 

ndim = None 

if self.shape: 

ndim = len(self.shape) 

 

if self.serialize_as == 'table': 

val = num.loadtxt( 

BytesIO(val.encode('utf-8')), 

dtype=self.dtype, ndmin=ndim) 

 

elif self.serialize_as == 'base64': 

data = b64decode(val) 

val = num.fromstring( 

data, dtype=self.serialize_dtype).astype(self.dtype) 

 

elif self.serialize_as == 'base64-compat': 

try: 

data = b64decode(val) 

val = num.fromstring( 

data, 

dtype=self.serialize_dtype).astype(self.dtype) 

except binascii.Error: 

val = num.loadtxt( 

BytesIO(val.encode('utf-8')), 

dtype=self.dtype, ndmin=ndim) 

 

elif self.serialize_as == 'npy': 

data = b64decode(val) 

try: 

val = num.load(BytesIO(data), allow_pickle=False) 

except TypeError: 

# allow_pickle only available in newer NumPy 

val = num.load(BytesIO(data)) 

 

elif isinstance(val, dict): 

if self.serialize_as == 'base64+meta': 

if not sorted(val.keys()) == ['data', 'dtype', 'shape']: 

raise ValidationError( 

'array in format "base64+meta" must have keys ' 

'"data", "dtype", and "shape"') 

 

shape = val['shape'] 

if not isinstance(shape, list): 

raise ValidationError('invalid shape definition') 

 

for n in shape: 

if not isinstance(n, int): 

raise ValidationError('invalid shape definition') 

 

serialize_dtype = val['dtype'] 

allowed = list(restricted_dtype_map_rev.keys()) 

if self.serialize_dtype is not None: 

allowed.append(self.serialize_dtype) 

 

if serialize_dtype not in allowed: 

raise ValidationError( 

'only the following dtypes are allowed: %s' 

% ', '.join(sorted(allowed))) 

 

data = val['data'] 

if not isinstance(data, (str, newstr)): 

raise ValidationError( 

'data must be given as a base64 encoded string') 

 

data = b64decode(data) 

 

dtype = self.dtype or \ 

restricted_dtype_map_rev[serialize_dtype] 

 

val = num.fromstring( 

data, dtype=serialize_dtype).astype(dtype) 

 

if val.size != num.product(shape): 

raise ValidationError('size/shape mismatch') 

 

val = val.reshape(shape) 

 

else: 

val = num.asarray(val, dtype=self.dtype) 

 

return val 

 

def validate_extra(self, val): 

if not isinstance(val, num.ndarray): 

raise ValidationError( 

'object is not of type numpy.ndarray: %s' % type(val)) 

if self.dtype is not None and self.dtype != val.dtype: 

raise ValidationError( 

'array not of required type: need %s, got %s' % ( 

self.dtype, val.dtype)) 

 

if self.shape is not None: 

la, lb = len(self.shape), len(val.shape) 

if la != lb: 

raise ValidationError( 

'array dimension mismatch: need %i, got %i' % ( 

la, lb)) 

 

for a, b in zip(self.shape, val.shape): 

if a is not None: 

if a != b: 

raise ValidationError( 

'array shape mismatch: need %s, got: %s' % ( 

self.shape, val.shape)) 

 

def to_save(self, val): 

if self.serialize_as == 'table': 

out = BytesIO() 

num.savetxt(out, val, fmt='%12.7g') 

return literal(out.getvalue().decode('utf-8')) 

elif self.serialize_as == 'base64' \ 

or self.serialize_as == 'base64-compat': 

data = val.astype(self.serialize_dtype).tostring() 

return literal(b64encode(data).decode('utf-8')) 

elif self.serialize_as == 'list': 

if self.dtype == num.complex: 

return [repr(x) for x in val] 

else: 

return val.tolist() 

elif self.serialize_as == 'npy': 

out = BytesIO() 

try: 

num.save(out, val, allow_pickle=False) 

except TypeError: 

# allow_pickle only available in newer NumPy 

num.save(out, val) 

 

return literal(b64encode(out.getvalue()).decode('utf-8')) 

 

elif self.serialize_as == 'base64+meta': 

serialize_dtype = self.serialize_dtype or \ 

restricted_dtype_map[val.dtype] 

 

data = val.astype(serialize_dtype).tostring() 

 

return dict( 

dtype=serialize_dtype, 

shape=val.shape, 

data=literal(b64encode(data).decode('utf-8'))) 

 

 

__all__ = ['Array']