Clustering Manager

GitHub Link to Code.

Clustering manager modules.

Contains manager classes for clustering operations.

Cluster Manager

ClusterManager for managing clustering data objects.

Manager for creating and managing clustering results from feature matrices or decomposition results. Used to add, reset, and manage clustering data in trajectory data objects.

class mdxplain.clustering.manager.cluster_manager.ClusterManager(cache_dir: str = './cache')

Manager for clustering data objects.

Manages the creation and storage of clustering results from feature matrices or decomposition results. Works with TrajectoryData objects to perform clustering analysis using various clustering methods (DBSCAN, HDBSCAN, DPA).

Examples

>>> # Create manager and add DBSCAN clustering
>>> from mdxplain.clustering import cluster_type
>>> manager = ClusterManager()
>>> manager.add(
...     pipeline_data, "feature_selection", cluster_type.DBSCAN(eps=0.5),
...     use_decomposed=False
... )
>>> # Manager with custom cache directory
>>> manager = ClusterManager(cache_dir="./cache/clustering")
>>> manager.add(
...     pipeline_data, "pca_decomposition", cluster_type.HDBSCAN(min_cluster_size=10),
...     use_decomposed=True
... )
__init__(cache_dir: str = './cache') None

Initialize cluster manager.

Parameters

cache_dirstr, optional

Cache directory path for clustering data, default=”./cache”

Returns

None

Initializes ClusterManager instance with specified configuration

Examples

>>> # Basic manager
>>> manager = ClusterManager()
>>> # Manager with custom cache directory
>>> manager = ClusterManager(cache_dir="./cache/clustering")
reset_clusters(pipeline_data: PipelineData) None

Reset all computed clustering results and clear clustering data.

This method removes all computed clustering results and their associated data, requiring clustering to be recalculated from scratch.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.clustering.reset_clusters()  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = ClusterManager()
>>> manager.reset_clusters(pipeline_data)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

Returns

None

Clears all clustering data from pipeline_data.cluster_data

Examples

>>> manager = ClusterManager()
>>> manager.reset_clusters(pipeline_data)
add_clustering(pipeline_data: PipelineData, selection_name: str, cluster_type: ClusterTypeBase, use_decomposed: bool = True, cluster_name: str | None = None, data_selector_name: str | None = None, force: bool = False, override_cache: bool = False, center_method: str = 'centroid') None

Add clustering analysis to trajectory data.

This method performs clustering analysis on the specified data selection using the provided cluster type. Results are stored in the PipelineData object’s cluster_data dictionary with the specified or default cluster name.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.clustering.add_clustering("selection", cluster_type.DBSCAN())  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = ClusterManager()
>>> manager.add_clustering(pipeline_data, "selection", cluster_type.DBSCAN())  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object containing feature selections or decomposition results

selection_namestr

Name of the feature selection (if use_decomposed=False) or decomposition result (if use_decomposed=True) to cluster

cluster_typeClusterTypeBase instance

Cluster type instance with parameters (e.g., DBSCAN(eps=0.5))

use_decomposedbool, optional

Whether to use decomposition results (True) or feature selections (False), default=True

cluster_namestr, optional

Custom name for storing clustering results. If None, defaults to str(cluster_type)

data_selector_namestr, optional

Name of DataSelector to apply frame filtering before clustering. If None, uses all frames from the selection. Only applies if use_decomposed=False

forcebool, optional

Whether to force recomputation if clustering already exists, default=False

override_cachebool, optional

Whether to clear entire cluster_name subdirectory before computation, default=False

center_methodstr, optional

Method for calculating cluster centers, default=”centroid”:

  • “centroid”: Representative point (medoid - closest to mean)

  • “mean”: Average of cluster members

  • “median”: Coordinate-wise median (robust to outliers)

  • “density_peak”: Point with highest local density

  • “median_centroid”: Medoid from median (more robust to outliers)

  • “rmsd_centroid”: Centroid using RMSD metric (better for structural comparisons)

NOTE: If algorithm has built-in centers, those are ALWAYS used regardless of this parameter.

Returns

None

Stores ClusterData object in pipeline_data.cluster_data dictionary

Examples

>>> # Cluster decomposition results with custom name
>>> from mdxplain.clustering import cluster_type
>>> manager = ClusterManager()
>>> manager.add(
...     pipeline_data, "pca_selection", cluster_type.DBSCAN(eps=0.5),
...     use_decomposed=True, cluster_name="my_dbscan"
... )
>>> # Cluster feature selection with default name  
>>> manager.add(
...     pipeline_data, "distance_selection", cluster_type.HDBSCAN(min_cluster_size=10),
...     use_decomposed=False
... )
>>> # Cluster with data selector (only specific frames)
>>> manager.add(
...     pipeline_data, "distance_selection", cluster_type.DBSCAN(eps=0.3),
...     data_selector_name="folded_frames", use_decomposed=False
... )

Raises

ValueError

If selection not found, cluster type invalid, or clustering computation fails

save(pipeline_data: PipelineData, save_path: str) None

Save all clustering data to single file.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.clustering.save('clustering.npy')  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = ClusterManager()
>>> manager.save(pipeline_data, 'clustering.npy')  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with clustering data

save_pathstr

Path where to save all clustering data in one file

Returns

None

Saves all clustering data to the specified file

Examples

>>> manager.save(pipeline_data, 'clustering.npy')
load(pipeline_data: PipelineData, load_path: str) None

Load all clustering data from single file.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.clustering.load('clustering.npy')  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = ClusterManager()
>>> manager.load(pipeline_data, 'clustering.npy')  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container to load clustering data into

load_pathstr

Path to saved clustering data file

Returns

None

Loads all clustering data from the specified file

Examples

>>> manager.load(pipeline_data, 'clustering.npy')
print_info(pipeline_data: PipelineData) None

Print clustering data information.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.clustering.print_info()  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = ClusterManager()
>>> manager.print_info(pipeline_data)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with clustering data

Returns

None

Prints clustering data information to console

Examples

>>> manager.print_info(pipeline_data)
property add: ClusterAddService

Service for adding clustering algorithms with simplified syntax.

Provides an intuitive interface for adding clustering algorithms without requiring explicit cluster type instantiation or imports.

Returns

ClusterAddService

Service instance for adding clustering algorithms with combined parameters

Examples

>>> # Add different clustering algorithms
>>> pipeline.clustering.add.dbscan("my_features", eps=0.5, min_samples=5)
>>> pipeline.clustering.add.hdbscan("pca_features", min_cluster_size=10)
>>> pipeline.clustering.add.dpa("distance_features", Z=2.0)

Notes

Pipeline data is automatically injected by AutoInjectProxy. All cluster type parameters are combined with manager.add parameters.