Coverage for /usr/local/lib/python3.11/dist-packages/grond/clustering/__init__.py: 79%

62 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-11-27 15:15 +0000

1# https://pyrocko.org/grond - GPLv3 

2# 

3# The Grond Developers, 21st Century 

4import optparse 

5 

6from pyrocko.guts import Object, Int, Float 

7from pyrocko import guts 

8 

9from grond.version import __version__ 

10from grond.meta import GrondError 

11 

12guts_prefix = 'grond' 

13 

14 

15class Clustering(Object): 

16 '''Base class for clustering method configuration objects.''' 

17 

18 def perform(self): 

19 raise NotImplementedError('should be implemented in subclass') 

20 

21 @classmethod 

22 def _cli_setup(cls, parser): 

23 

24 pmap = { 

25 Float.T: float, 

26 Int.T: int} 

27 

28 group = optparse.OptionGroup( 

29 parser, cls.name, 

30 'Options specific for the "%s" clustering method' % cls.name) 

31 

32 for prop in cls.T.properties: 

33 if isinstance(prop, tuple(pmap.keys())): 

34 group.add_option( 

35 '--%s' % u2d(prop.name), 

36 dest=prop.name, 

37 type=pmap[prop.__class__], 

38 default=prop.default(), 

39 help=prop.help + ' (default: %default)') 

40 

41 parser.add_option_group(group) 

42 

43 @staticmethod 

44 def cli_setup(name, setup): 

45 if name in Clustering.name_to_class: 

46 def setup_(parser): 

47 setup(parser) 

48 Clustering.name_to_class[name]._cli_setup(parser) 

49 

50 return setup_ 

51 

52 else: 

53 return setup 

54 

55 @classmethod 

56 def _cli_instantiate(cls, options): 

57 pmap = { 

58 Float.T: float, 

59 Int.T: int} 

60 

61 kwargs = {} 

62 for prop in cls.T.properties: 

63 if isinstance(prop, tuple(pmap.keys())): 

64 kwargs[prop.name] = getattr(options, prop.name) 

65 

66 return cls(**kwargs) 

67 

68 @staticmethod 

69 def cli_instantiate(name, options): 

70 return Clustering.name_to_class[name]._cli_instantiate(options) 

71 

72 

73def u2d(u): 

74 return u.replace('_', '-') 

75 

76 

77class DBScan(Clustering): 

78 '''DBSCAN clustering algorithm.''' 

79 

80 name = 'dbscan' 

81 

82 nmin = Int.T( 

83 default=10, 

84 help='Minimum number of neighbours to define a cluster.') 

85 

86 eps = Float.T( 

87 default=0.1, 

88 help='Maximum distance to search for neighbors.') 

89 

90 ncluster_limit = Int.T( 

91 optional=True, 

92 help='Limit maximum number of clusters created to N.') 

93 

94 def perform(self, similarity_matrix): 

95 from .dbscan import dbscan 

96 return dbscan( 

97 similarity_matrix, 

98 nmin=self.nmin, 

99 eps=self.eps, 

100 ncluster_limit=self.ncluster_limit) 

101 

102 

103def read_config(path): 

104 try: 

105 config = guts.load(filename=path) 

106 except OSError: 

107 raise GrondError( 

108 'cannot read Grond clustering configuration file: %s' % path) 

109 

110 if not isinstance(config, Clustering): 

111 raise GrondError( 

112 'invalid Grond clustering configuration in file "%s"' % path) 

113 

114 return config 

115 

116 

117def write_config(config, path): 

118 try: 

119 guts.dump( 

120 config, 

121 filename=path, 

122 header='Grond clustering configuration file, version %s' 

123 % __version__) 

124 

125 except OSError: 

126 raise GrondError( 

127 'cannot write Grond report configuration file: %s' % path) 

128 

129 

130Clustering.name_to_class = dict((cls.name, cls) for cls in [DBScan]) 

131 

132methods = sorted(Clustering.name_to_class.keys()) 

133 

134__all__ = [ 

135 'Clustering', 

136 'DBScan', 

137]