Clustering Manager
GitHub Link to Code.
Clustering manager modules.
Contains manager classes for clustering operations.
Cluster Manager
ClusterManager for managing clustering data objects.
Manager for creating and managing clustering results from feature matrices or decomposition results. Used to add, reset, and manage clustering data in trajectory data objects.
- class mdxplain.clustering.manager.cluster_manager.ClusterManager(cache_dir: str = './cache')
Manager for clustering data objects.
Manages the creation and storage of clustering results from feature matrices or decomposition results. Works with TrajectoryData objects to perform clustering analysis using various clustering methods (DBSCAN, HDBSCAN, DPA).
Examples
>>> # Create manager and add DBSCAN clustering >>> from mdxplain.clustering import cluster_type >>> manager = ClusterManager() >>> manager.add( ... pipeline_data, "feature_selection", cluster_type.DBSCAN(eps=0.5), ... use_decomposed=False ... )
>>> # Manager with custom cache directory >>> manager = ClusterManager(cache_dir="./cache/clustering") >>> manager.add( ... pipeline_data, "pca_decomposition", cluster_type.HDBSCAN(min_cluster_size=10), ... use_decomposed=True ... )
- __init__(cache_dir: str = './cache') None
Initialize cluster manager.
Parameters
- cache_dirstr, optional
Cache directory path for clustering data, default=”./cache”
Returns
- None
Initializes ClusterManager instance with specified configuration
Examples
>>> # Basic manager >>> manager = ClusterManager()
>>> # Manager with custom cache directory >>> manager = ClusterManager(cache_dir="./cache/clustering")
- reset_clusters(pipeline_data: PipelineData) None
Reset all computed clustering results and clear clustering data.
This method removes all computed clustering results and their associated data, requiring clustering to be recalculated from scratch.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.clustering.reset_clusters() # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = ClusterManager() >>> manager.reset_clusters(pipeline_data) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
Returns
- None
Clears all clustering data from pipeline_data.cluster_data
Examples
>>> manager = ClusterManager() >>> manager.reset_clusters(pipeline_data)
- add_clustering(pipeline_data: PipelineData, selection_name: str, cluster_type: ClusterTypeBase, use_decomposed: bool = True, cluster_name: str | None = None, data_selector_name: str | None = None, force: bool = False, override_cache: bool = False, center_method: str = 'centroid') None
Add clustering analysis to trajectory data.
This method performs clustering analysis on the specified data selection using the provided cluster type. Results are stored in the PipelineData object’s cluster_data dictionary with the specified or default cluster name.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.clustering.add_clustering("selection", cluster_type.DBSCAN()) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = ClusterManager() >>> manager.add_clustering(pipeline_data, "selection", cluster_type.DBSCAN()) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object containing feature selections or decomposition results
- selection_namestr
Name of the feature selection (if use_decomposed=False) or decomposition result (if use_decomposed=True) to cluster
- cluster_typeClusterTypeBase instance
Cluster type instance with parameters (e.g., DBSCAN(eps=0.5))
- use_decomposedbool, optional
Whether to use decomposition results (True) or feature selections (False), default=True
- cluster_namestr, optional
Custom name for storing clustering results. If None, defaults to str(cluster_type)
- data_selector_namestr, optional
Name of DataSelector to apply frame filtering before clustering. If None, uses all frames from the selection. Only applies if use_decomposed=False
- forcebool, optional
Whether to force recomputation if clustering already exists, default=False
- override_cachebool, optional
Whether to clear entire cluster_name subdirectory before computation, default=False
- center_methodstr, optional
Method for calculating cluster centers, default=”centroid”:
“centroid”: Representative point (medoid - closest to mean)
“mean”: Average of cluster members
“median”: Coordinate-wise median (robust to outliers)
“density_peak”: Point with highest local density
“median_centroid”: Medoid from median (more robust to outliers)
“rmsd_centroid”: Centroid using RMSD metric (better for structural comparisons)
NOTE: If algorithm has built-in centers, those are ALWAYS used regardless of this parameter.
Returns
- None
Stores ClusterData object in pipeline_data.cluster_data dictionary
Examples
>>> # Cluster decomposition results with custom name >>> from mdxplain.clustering import cluster_type >>> manager = ClusterManager() >>> manager.add( ... pipeline_data, "pca_selection", cluster_type.DBSCAN(eps=0.5), ... use_decomposed=True, cluster_name="my_dbscan" ... )
>>> # Cluster feature selection with default name >>> manager.add( ... pipeline_data, "distance_selection", cluster_type.HDBSCAN(min_cluster_size=10), ... use_decomposed=False ... )
>>> # Cluster with data selector (only specific frames) >>> manager.add( ... pipeline_data, "distance_selection", cluster_type.DBSCAN(eps=0.3), ... data_selector_name="folded_frames", use_decomposed=False ... )
Raises
- ValueError
If selection not found, cluster type invalid, or clustering computation fails
- save(pipeline_data: PipelineData, save_path: str) None
Save all clustering data to single file.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.clustering.save('clustering.npy') # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = ClusterManager() >>> manager.save(pipeline_data, 'clustering.npy') # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with clustering data
- save_pathstr
Path where to save all clustering data in one file
Returns
- None
Saves all clustering data to the specified file
Examples
>>> manager.save(pipeline_data, 'clustering.npy')
- load(pipeline_data: PipelineData, load_path: str) None
Load all clustering data from single file.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.clustering.load('clustering.npy') # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = ClusterManager() >>> manager.load(pipeline_data, 'clustering.npy') # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container to load clustering data into
- load_pathstr
Path to saved clustering data file
Returns
- None
Loads all clustering data from the specified file
Examples
>>> manager.load(pipeline_data, 'clustering.npy')
- print_info(pipeline_data: PipelineData) None
Print clustering data information.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.clustering.print_info() # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = ClusterManager() >>> manager.print_info(pipeline_data) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with clustering data
Returns
- None
Prints clustering data information to console
Examples
>>> manager.print_info(pipeline_data)
- property add: ClusterAddService
Service for adding clustering algorithms with simplified syntax.
Provides an intuitive interface for adding clustering algorithms without requiring explicit cluster type instantiation or imports.
Returns
- ClusterAddService
Service instance for adding clustering algorithms with combined parameters
Examples
>>> # Add different clustering algorithms >>> pipeline.clustering.add.dbscan("my_features", eps=0.5, min_samples=5) >>> pipeline.clustering.add.hdbscan("pca_features", min_cluster_size=10) >>> pipeline.clustering.add.dpa("distance_features", Z=2.0)
Notes
Pipeline data is automatically injected by AutoInjectProxy. All cluster type parameters are combined with manager.add parameters.