1import optparse 

2 

3from pyrocko.guts import Object, Int, Float 

4from pyrocko import guts 

5 

6from grond.version import __version__ 

7from grond.meta import GrondError 

8 

9guts_prefix = 'grond' 

10 

11 

12class Clustering(Object): 

13 '''Base class for clustering method configuration objects.''' 

14 

15 def perform(self): 

16 raise NotImplementedError('should be implemented in subclass') 

17 

18 @classmethod 

19 def _cli_setup(cls, parser): 

20 

21 pmap = { 

22 Float.T: float, 

23 Int.T: int} 

24 

25 group = optparse.OptionGroup( 

26 parser, cls.name, 

27 'Options specific for the "%s" clustering method' % cls.name) 

28 

29 for prop in cls.T.properties: 

30 if isinstance(prop, tuple(pmap.keys())): 

31 group.add_option( 

32 '--%s' % u2d(prop.name), 

33 dest=prop.name, 

34 type=pmap[prop.__class__], 

35 default=prop.default(), 

36 help=prop.help + ' (default: %default)') 

37 

38 parser.add_option_group(group) 

39 

40 @staticmethod 

41 def cli_setup(name, setup): 

42 if name in Clustering.name_to_class: 

43 def setup_(parser): 

44 setup(parser) 

45 Clustering.name_to_class[name]._cli_setup(parser) 

46 

47 return setup_ 

48 

49 else: 

50 return setup 

51 

52 @classmethod 

53 def _cli_instantiate(cls, options): 

54 pmap = { 

55 Float.T: float, 

56 Int.T: int} 

57 

58 kwargs = {} 

59 for prop in cls.T.properties: 

60 if isinstance(prop, tuple(pmap.keys())): 

61 kwargs[prop.name] = getattr(options, prop.name) 

62 

63 return cls(**kwargs) 

64 

65 @staticmethod 

66 def cli_instantiate(name, options): 

67 return Clustering.name_to_class[name]._cli_instantiate(options) 

68 

69 

70def u2d(u): 

71 return u.replace('_', '-') 

72 

73 

74class DBScan(Clustering): 

75 '''DBSCAN clustering algorithm.''' 

76 

77 name = 'dbscan' 

78 

79 nmin = Int.T( 

80 default=10, 

81 help='Minimum number of neighbours to define a cluster.') 

82 

83 eps = Float.T( 

84 default=0.1, 

85 help='Maximum distance to search for neighbors.') 

86 

87 ncluster_limit = Int.T( 

88 default=None, 

89 help='Limit maximum number of clusters created to N.') 

90 

91 def perform(self, similarity_matrix): 

92 from .dbscan import dbscan 

93 return dbscan( 

94 similarity_matrix, 

95 nmin=self.nmin, 

96 eps=self.eps, 

97 ncluster_limit=self.ncluster_limit) 

98 

99 

100def read_config(path): 

101 try: 

102 config = guts.load(filename=path) 

103 except OSError: 

104 raise GrondError( 

105 'cannot read Grond clustering configuration file: %s' % path) 

106 

107 if not isinstance(config, Clustering): 

108 raise GrondError( 

109 'invalid Grond clustering configuration in file "%s"' % path) 

110 

111 return config 

112 

113 

114def write_config(config, path): 

115 try: 

116 guts.dump( 

117 config, 

118 filename=path, 

119 header='Grond clustering configuration file, version %s' 

120 % __version__) 

121 

122 except OSError: 

123 raise GrondError( 

124 'cannot write Grond report configuration file: %s' % path) 

125 

126 

127Clustering.name_to_class = dict((cls.name, cls) for cls in [DBScan]) 

128 

129methods = sorted(Clustering.name_to_class.keys()) 

130 

131__all__ = [ 

132 'Clustering', 

133 'DBScan', 

134]