Coverage for /usr/local/lib/python3.11/dist-packages/grond/clustering/__init__.py: 79%
62 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-04-03 09:31 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-04-03 09:31 +0000
1# https://pyrocko.org/grond - GPLv3
2#
3# The Grond Developers, 21st Century
4import optparse
6from pyrocko.guts import Object, Int, Float
7from pyrocko import guts
9from grond.version import __version__
10from grond.meta import GrondError
12guts_prefix = 'grond'
15class Clustering(Object):
16 '''Base class for clustering method configuration objects.'''
18 def perform(self):
19 raise NotImplementedError('should be implemented in subclass')
21 @classmethod
22 def _cli_setup(cls, parser):
24 pmap = {
25 Float.T: float,
26 Int.T: int}
28 group = optparse.OptionGroup(
29 parser, cls.name,
30 'Options specific for the "%s" clustering method' % cls.name)
32 for prop in cls.T.properties:
33 if isinstance(prop, tuple(pmap.keys())):
34 group.add_option(
35 '--%s' % u2d(prop.name),
36 dest=prop.name,
37 type=pmap[prop.__class__],
38 default=prop.default(),
39 help=prop.help + ' (default: %default)')
41 parser.add_option_group(group)
43 @staticmethod
44 def cli_setup(name, setup):
45 if name in Clustering.name_to_class:
46 def setup_(parser):
47 setup(parser)
48 Clustering.name_to_class[name]._cli_setup(parser)
50 return setup_
52 else:
53 return setup
55 @classmethod
56 def _cli_instantiate(cls, options):
57 pmap = {
58 Float.T: float,
59 Int.T: int}
61 kwargs = {}
62 for prop in cls.T.properties:
63 if isinstance(prop, tuple(pmap.keys())):
64 kwargs[prop.name] = getattr(options, prop.name)
66 return cls(**kwargs)
68 @staticmethod
69 def cli_instantiate(name, options):
70 return Clustering.name_to_class[name]._cli_instantiate(options)
73def u2d(u):
74 return u.replace('_', '-')
77class DBScan(Clustering):
78 '''DBSCAN clustering algorithm.'''
80 name = 'dbscan'
82 nmin = Int.T(
83 default=10,
84 help='Minimum number of neighbours to define a cluster.')
86 eps = Float.T(
87 default=0.1,
88 help='Maximum distance to search for neighbors.')
90 ncluster_limit = Int.T(
91 optional=True,
92 help='Limit maximum number of clusters created to N.')
94 def perform(self, similarity_matrix):
95 from .dbscan import dbscan
96 return dbscan(
97 similarity_matrix,
98 nmin=self.nmin,
99 eps=self.eps,
100 ncluster_limit=self.ncluster_limit)
103def read_config(path):
104 try:
105 config = guts.load(filename=path)
106 except OSError:
107 raise GrondError(
108 'cannot read Grond clustering configuration file: %s' % path)
110 if not isinstance(config, Clustering):
111 raise GrondError(
112 'invalid Grond clustering configuration in file "%s"' % path)
114 return config
117def write_config(config, path):
118 try:
119 guts.dump(
120 config,
121 filename=path,
122 header='Grond clustering configuration file, version %s'
123 % __version__)
125 except OSError:
126 raise GrondError(
127 'cannot write Grond report configuration file: %s' % path)
130Clustering.name_to_class = dict((cls.name, cls) for cls in [DBScan])
132methods = sorted(Clustering.name_to_class.keys())
134__all__ = [
135 'Clustering',
136 'DBScan',
137]